aboutsummaryrefslogtreecommitdiffstats
path: root/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/mail/test/test_smtp.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/mail/test/test_smtp.py')
-rwxr-xr-xlib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/mail/test/test_smtp.py1520
1 files changed, 0 insertions, 1520 deletions
diff --git a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/mail/test/test_smtp.py b/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/mail/test/test_smtp.py
deleted file mode 100755
index 058bb8ec..00000000
--- a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/mail/test/test_smtp.py
+++ /dev/null
@@ -1,1520 +0,0 @@
-# Copyright (c) Twisted Matrix Laboratories.
-# See LICENSE for details.
-
-"""
-Test cases for twisted.mail.smtp module.
-"""
-
-from zope.interface import implements
-
-from twisted.python.util import LineLog
-from twisted.trial import unittest, util
-from twisted.protocols import basic, loopback
-from twisted.mail import smtp
-from twisted.internet import defer, protocol, reactor, interfaces
-from twisted.internet import address, error, task
-from twisted.test.proto_helpers import StringTransport
-
-from twisted import cred
-import twisted.cred.error
-import twisted.cred.portal
-import twisted.cred.checkers
-import twisted.cred.credentials
-
-from twisted.cred.portal import IRealm, Portal
-from twisted.cred.checkers import ICredentialsChecker, AllowAnonymousAccess
-from twisted.cred.credentials import IAnonymous
-from twisted.cred.error import UnauthorizedLogin
-
-from twisted.mail import imap4
-
-
-try:
- from twisted.test.ssl_helpers import ClientTLSContext, ServerTLSContext
-except ImportError:
- ClientTLSContext = ServerTLSContext = None
-
-import re
-
-try:
- from cStringIO import StringIO
-except ImportError:
- from StringIO import StringIO
-
-
-def spameater(*spam, **eggs):
- return None
-
-
-
-class BrokenMessage(object):
- """
- L{BrokenMessage} is an L{IMessage} which raises an unexpected exception
- from its C{eomReceived} method. This is useful for creating a server which
- can be used to test client retry behavior.
- """
- implements(smtp.IMessage)
-
- def __init__(self, user):
- pass
-
-
- def lineReceived(self, line):
- pass
-
-
- def eomReceived(self):
- raise RuntimeError("Some problem, delivery is failing.")
-
-
- def connectionLost(self):
- pass
-
-
-
-class DummyMessage(object):
- """
- L{BrokenMessage} is an L{IMessage} which saves the message delivered to it
- to its domain object.
-
- @ivar domain: A L{DummyDomain} which will be used to store the message once
- it is received.
- """
- def __init__(self, domain, user):
- self.domain = domain
- self.user = user
- self.buffer = []
-
-
- def lineReceived(self, line):
- # Throw away the generated Received: header
- if not re.match('Received: From yyy.com \(\[.*\]\) by localhost;', line):
- self.buffer.append(line)
-
-
- def eomReceived(self):
- message = '\n'.join(self.buffer) + '\n'
- self.domain.messages[self.user.dest.local].append(message)
- deferred = defer.Deferred()
- deferred.callback("saved")
- return deferred
-
-
-
-class DummyDomain(object):
- """
- L{DummyDomain} is an L{IDomain} which keeps track of messages delivered to
- it in memory.
- """
- def __init__(self, names):
- self.messages = {}
- for name in names:
- self.messages[name] = []
-
-
- def exists(self, user):
- if user.dest.local in self.messages:
- return defer.succeed(lambda: self.startMessage(user))
- return defer.fail(smtp.SMTPBadRcpt(user))
-
-
- def startMessage(self, user):
- return DummyMessage(self, user)
-
-
-
-class SMTPTestCase(unittest.TestCase):
-
- messages = [('foo@bar.com', ['foo@baz.com', 'qux@baz.com'], '''\
-Subject: urgent\015
-\015
-Someone set up us the bomb!\015
-''')]
-
- mbox = {'foo': ['Subject: urgent\n\nSomeone set up us the bomb!\n']}
-
- def setUp(self):
- """
- Create an in-memory mail domain to which messages may be delivered by
- tests and create a factory and transport to do the delivering.
- """
- self.factory = smtp.SMTPFactory()
- self.factory.domains = {}
- self.factory.domains['baz.com'] = DummyDomain(['foo'])
- self.transport = StringTransport()
-
-
- def testMessages(self):
- from twisted.mail import protocols
- protocol = protocols.DomainSMTP()
- protocol.service = self.factory
- protocol.factory = self.factory
- protocol.receivedHeader = spameater
- protocol.makeConnection(self.transport)
- protocol.lineReceived('HELO yyy.com')
- for message in self.messages:
- protocol.lineReceived('MAIL FROM:<%s>' % message[0])
- for target in message[1]:
- protocol.lineReceived('RCPT TO:<%s>' % target)
- protocol.lineReceived('DATA')
- protocol.dataReceived(message[2])
- protocol.lineReceived('.')
- protocol.lineReceived('QUIT')
- if self.mbox != self.factory.domains['baz.com'].messages:
- raise AssertionError(self.factory.domains['baz.com'].messages)
- protocol.setTimeout(None)
-
- testMessages.suppress = [util.suppress(message='DomainSMTP', category=DeprecationWarning)]
-
-mail = '''\
-Subject: hello
-
-Goodbye
-'''
-
-class MyClient:
- def __init__(self, messageInfo=None):
- if messageInfo is None:
- messageInfo = (
- 'moshez@foo.bar', ['moshez@foo.bar'], StringIO(mail))
- self._sender = messageInfo[0]
- self._recipient = messageInfo[1]
- self._data = messageInfo[2]
-
-
- def getMailFrom(self):
- return self._sender
-
-
- def getMailTo(self):
- return self._recipient
-
-
- def getMailData(self):
- return self._data
-
-
- def sendError(self, exc):
- self._error = exc
-
-
- def sentMail(self, code, resp, numOk, addresses, log):
- # Prevent another mail from being sent.
- self._sender = None
- self._recipient = None
- self._data = None
-
-
-
-class MySMTPClient(MyClient, smtp.SMTPClient):
- def __init__(self, messageInfo=None):
- smtp.SMTPClient.__init__(self, 'foo.baz')
- MyClient.__init__(self, messageInfo)
-
-class MyESMTPClient(MyClient, smtp.ESMTPClient):
- def __init__(self, secret = '', contextFactory = None):
- smtp.ESMTPClient.__init__(self, secret, contextFactory, 'foo.baz')
- MyClient.__init__(self)
-
-class LoopbackMixin:
- def loopback(self, server, client):
- return loopback.loopbackTCP(server, client)
-
-class LoopbackTestCase(LoopbackMixin):
- def testMessages(self):
- factory = smtp.SMTPFactory()
- factory.domains = {}
- factory.domains['foo.bar'] = DummyDomain(['moshez'])
- from twisted.mail.protocols import DomainSMTP
- protocol = DomainSMTP()
- protocol.service = factory
- protocol.factory = factory
- clientProtocol = self.clientClass()
- return self.loopback(protocol, clientProtocol)
- testMessages.suppress = [util.suppress(message='DomainSMTP', category=DeprecationWarning)]
-
-class LoopbackSMTPTestCase(LoopbackTestCase, unittest.TestCase):
- clientClass = MySMTPClient
-
-class LoopbackESMTPTestCase(LoopbackTestCase, unittest.TestCase):
- clientClass = MyESMTPClient
-
-
-class FakeSMTPServer(basic.LineReceiver):
-
- clientData = [
- '220 hello', '250 nice to meet you',
- '250 great', '250 great', '354 go on, lad'
- ]
-
- def connectionMade(self):
- self.buffer = []
- self.clientData = self.clientData[:]
- self.clientData.reverse()
- self.sendLine(self.clientData.pop())
-
- def lineReceived(self, line):
- self.buffer.append(line)
- if line == "QUIT":
- self.transport.write("221 see ya around\r\n")
- self.transport.loseConnection()
- elif line == ".":
- self.transport.write("250 gotcha\r\n")
- elif line == "RSET":
- self.transport.loseConnection()
-
- if self.clientData:
- self.sendLine(self.clientData.pop())
-
-
-class SMTPClientTestCase(unittest.TestCase, LoopbackMixin):
- """
- Tests for L{smtp.SMTPClient}.
- """
-
- def test_timeoutConnection(self):
- """
- L{smtp.SMTPClient.timeoutConnection} calls the C{sendError} hook with a
- fatal L{SMTPTimeoutError} with the current line log.
- """
- error = []
- client = MySMTPClient()
- client.sendError = error.append
- client.makeConnection(StringTransport())
- client.lineReceived("220 hello")
- client.timeoutConnection()
- self.assertIsInstance(error[0], smtp.SMTPTimeoutError)
- self.assertTrue(error[0].isFatal)
- self.assertEqual(
- str(error[0]),
- "Timeout waiting for SMTP server response\n"
- "<<< 220 hello\n"
- ">>> HELO foo.baz\n")
-
-
- expected_output = [
- 'HELO foo.baz', 'MAIL FROM:<moshez@foo.bar>',
- 'RCPT TO:<moshez@foo.bar>', 'DATA',
- 'Subject: hello', '', 'Goodbye', '.', 'RSET'
- ]
-
- def test_messages(self):
- """
- L{smtp.SMTPClient} sends I{HELO}, I{MAIL FROM}, I{RCPT TO}, and I{DATA}
- commands based on the return values of its C{getMailFrom},
- C{getMailTo}, and C{getMailData} methods.
- """
- client = MySMTPClient()
- server = FakeSMTPServer()
- d = self.loopback(server, client)
- d.addCallback(lambda x :
- self.assertEqual(server.buffer, self.expected_output))
- return d
-
-
- def test_transferError(self):
- """
- If there is an error while producing the message body to the
- connection, the C{sendError} callback is invoked.
- """
- client = MySMTPClient(
- ('alice@example.com', ['bob@example.com'], StringIO("foo")))
- transport = StringTransport()
- client.makeConnection(transport)
- client.dataReceived(
- '220 Ok\r\n' # Greeting
- '250 Ok\r\n' # EHLO response
- '250 Ok\r\n' # MAIL FROM response
- '250 Ok\r\n' # RCPT TO response
- '354 Ok\r\n' # DATA response
- )
-
- # Sanity check - a pull producer should be registered now.
- self.assertNotIdentical(transport.producer, None)
- self.assertFalse(transport.streaming)
-
- # Now stop the producer prematurely, meaning the message was not sent.
- transport.producer.stopProducing()
-
- # The sendError hook should have been invoked as a result.
- self.assertIsInstance(client._error, Exception)
-
-
- def test_sendFatalError(self):
- """
- If L{smtp.SMTPClient.sendError} is called with an L{SMTPClientError}
- which is fatal, it disconnects its transport without writing anything
- more to it.
- """
- client = smtp.SMTPClient(None)
- transport = StringTransport()
- client.makeConnection(transport)
- client.sendError(smtp.SMTPClientError(123, "foo", isFatal=True))
- self.assertEqual(transport.value(), "")
- self.assertTrue(transport.disconnecting)
-
-
- def test_sendNonFatalError(self):
- """
- If L{smtp.SMTPClient.sendError} is called with an L{SMTPClientError}
- which is not fatal, it sends C{"QUIT"} and waits for the server to
- close the connection.
- """
- client = smtp.SMTPClient(None)
- transport = StringTransport()
- client.makeConnection(transport)
- client.sendError(smtp.SMTPClientError(123, "foo", isFatal=False))
- self.assertEqual(transport.value(), "QUIT\r\n")
- self.assertFalse(transport.disconnecting)
-
-
- def test_sendOtherError(self):
- """
- If L{smtp.SMTPClient.sendError} is called with an exception which is
- not an L{SMTPClientError}, it disconnects its transport without
- writing anything more to it.
- """
- client = smtp.SMTPClient(None)
- transport = StringTransport()
- client.makeConnection(transport)
- client.sendError(Exception("foo"))
- self.assertEqual(transport.value(), "")
- self.assertTrue(transport.disconnecting)
-
-
-
-class DummySMTPMessage:
-
- def __init__(self, protocol, users):
- self.protocol = protocol
- self.users = users
- self.buffer = []
-
- def lineReceived(self, line):
- self.buffer.append(line)
-
- def eomReceived(self):
- message = '\n'.join(self.buffer) + '\n'
- helo, origin = self.users[0].helo[0], str(self.users[0].orig)
- recipients = []
- for user in self.users:
- recipients.append(str(user))
- self.protocol.message[tuple(recipients)] = (helo, origin, recipients, message)
- return defer.succeed("saved")
-
-
-
-class DummyProto:
- def connectionMade(self):
- self.dummyMixinBase.connectionMade(self)
- self.message = {}
-
- def startMessage(self, users):
- return DummySMTPMessage(self, users)
-
- def receivedHeader(*spam):
- return None
-
- def validateTo(self, user):
- self.delivery = SimpleDelivery(None)
- return lambda: self.startMessage([user])
-
- def validateFrom(self, helo, origin):
- return origin
-
-
-
-class DummySMTP(DummyProto, smtp.SMTP):
- dummyMixinBase = smtp.SMTP
-
-class DummyESMTP(DummyProto, smtp.ESMTP):
- dummyMixinBase = smtp.ESMTP
-
-class AnotherTestCase:
- serverClass = None
- clientClass = None
-
- messages = [ ('foo.com', 'moshez@foo.com', ['moshez@bar.com'],
- 'moshez@foo.com', ['moshez@bar.com'], '''\
-From: Moshe
-To: Moshe
-
-Hi,
-how are you?
-'''),
- ('foo.com', 'tttt@rrr.com', ['uuu@ooo', 'yyy@eee'],
- 'tttt@rrr.com', ['uuu@ooo', 'yyy@eee'], '''\
-Subject: pass
-
-..rrrr..
-'''),
- ('foo.com', '@this,@is,@ignored:foo@bar.com',
- ['@ignore,@this,@too:bar@foo.com'],
- 'foo@bar.com', ['bar@foo.com'], '''\
-Subject: apa
-To: foo
-
-123
-.
-456
-'''),
- ]
-
- data = [
- ('', '220.*\r\n$', None, None),
- ('HELO foo.com\r\n', '250.*\r\n$', None, None),
- ('RSET\r\n', '250.*\r\n$', None, None),
- ]
- for helo_, from_, to_, realfrom, realto, msg in messages:
- data.append(('MAIL FROM:<%s>\r\n' % from_, '250.*\r\n',
- None, None))
- for rcpt in to_:
- data.append(('RCPT TO:<%s>\r\n' % rcpt, '250.*\r\n',
- None, None))
-
- data.append(('DATA\r\n','354.*\r\n',
- msg, ('250.*\r\n',
- (helo_, realfrom, realto, msg))))
-
-
- def test_buffer(self):
- """
- Exercise a lot of the SMTP client code. This is a "shotgun" style unit
- test. It does a lot of things and hopes that something will go really
- wrong if it is going to go wrong. This test should be replaced with a
- suite of nicer tests.
- """
- transport = StringTransport()
- a = self.serverClass()
- class fooFactory:
- domain = 'foo.com'
-
- a.factory = fooFactory()
- a.makeConnection(transport)
- for (send, expect, msg, msgexpect) in self.data:
- if send:
- a.dataReceived(send)
- data = transport.value()
- transport.clear()
- if not re.match(expect, data):
- raise AssertionError, (send, expect, data)
- if data[:3] == '354':
- for line in msg.splitlines():
- if line and line[0] == '.':
- line = '.' + line
- a.dataReceived(line + '\r\n')
- a.dataReceived('.\r\n')
- # Special case for DATA. Now we want a 250, and then
- # we compare the messages
- data = transport.value()
- transport.clear()
- resp, msgdata = msgexpect
- if not re.match(resp, data):
- raise AssertionError, (resp, data)
- for recip in msgdata[2]:
- expected = list(msgdata[:])
- expected[2] = [recip]
- self.assertEqual(
- a.message[(recip,)],
- tuple(expected)
- )
- a.setTimeout(None)
-
-
-class AnotherESMTPTestCase(AnotherTestCase, unittest.TestCase):
- serverClass = DummyESMTP
- clientClass = MyESMTPClient
-
-class AnotherSMTPTestCase(AnotherTestCase, unittest.TestCase):
- serverClass = DummySMTP
- clientClass = MySMTPClient
-
-
-
-class DummyChecker:
- implements(cred.checkers.ICredentialsChecker)
-
- users = {
- 'testuser': 'testpassword'
- }
-
- credentialInterfaces = (cred.credentials.IUsernamePassword,
- cred.credentials.IUsernameHashedPassword)
-
- def requestAvatarId(self, credentials):
- return defer.maybeDeferred(
- credentials.checkPassword, self.users[credentials.username]
- ).addCallback(self._cbCheck, credentials.username)
-
- def _cbCheck(self, result, username):
- if result:
- return username
- raise cred.error.UnauthorizedLogin()
-
-
-
-class SimpleDelivery(object):
- """
- L{SimpleDelivery} is a message delivery factory with no interesting
- behavior.
- """
- implements(smtp.IMessageDelivery)
-
- def __init__(self, messageFactory):
- self._messageFactory = messageFactory
-
-
- def receivedHeader(self, helo, origin, recipients):
- return None
-
-
- def validateFrom(self, helo, origin):
- return origin
-
-
- def validateTo(self, user):
- return lambda: self._messageFactory(user)
-
-
-
-class DummyRealm:
- def requestAvatar(self, avatarId, mind, *interfaces):
- return smtp.IMessageDelivery, SimpleDelivery(None), lambda: None
-
-
-
-class AuthTestCase(unittest.TestCase, LoopbackMixin):
- def test_crammd5Auth(self):
- """
- L{ESMTPClient} can authenticate using the I{CRAM-MD5} SASL mechanism.
-
- @see: U{http://tools.ietf.org/html/rfc2195}
- """
- realm = DummyRealm()
- p = cred.portal.Portal(realm)
- p.registerChecker(DummyChecker())
-
- server = DummyESMTP({'CRAM-MD5': cred.credentials.CramMD5Credentials})
- server.portal = p
- client = MyESMTPClient('testpassword')
-
- cAuth = smtp.CramMD5ClientAuthenticator('testuser')
- client.registerAuthenticator(cAuth)
-
- d = self.loopback(server, client)
- d.addCallback(lambda x : self.assertEqual(server.authenticated, 1))
- return d
-
-
- def test_loginAuth(self):
- """
- L{ESMTPClient} can authenticate using the I{LOGIN} SASL mechanism.
-
- @see: U{http://sepp.oetiker.ch/sasl-2.1.19-ds/draft-murchison-sasl-login-00.txt}
- """
- realm = DummyRealm()
- p = cred.portal.Portal(realm)
- p.registerChecker(DummyChecker())
-
- server = DummyESMTP({'LOGIN': imap4.LOGINCredentials})
- server.portal = p
- client = MyESMTPClient('testpassword')
-
- cAuth = smtp.LOGINAuthenticator('testuser')
- client.registerAuthenticator(cAuth)
-
- d = self.loopback(server, client)
- d.addCallback(lambda x: self.assertTrue(server.authenticated))
- return d
-
-
- def test_loginAgainstWeirdServer(self):
- """
- When communicating with a server which implements the I{LOGIN} SASL
- mechanism using C{"Username:"} as the challenge (rather than C{"User
- Name\\0"}), L{ESMTPClient} can still authenticate successfully using
- the I{LOGIN} mechanism.
- """
- realm = DummyRealm()
- p = cred.portal.Portal(realm)
- p.registerChecker(DummyChecker())
-
- server = DummyESMTP({'LOGIN': smtp.LOGINCredentials})
- server.portal = p
-
- client = MyESMTPClient('testpassword')
- cAuth = smtp.LOGINAuthenticator('testuser')
- client.registerAuthenticator(cAuth)
-
- d = self.loopback(server, client)
- d.addCallback(lambda x: self.assertTrue(server.authenticated))
- return d
-
-
-
-class SMTPHelperTestCase(unittest.TestCase):
- def testMessageID(self):
- d = {}
- for i in range(1000):
- m = smtp.messageid('testcase')
- self.failIf(m in d)
- d[m] = None
-
- def testQuoteAddr(self):
- cases = [
- ['user@host.name', '<user@host.name>'],
- ['"User Name" <user@host.name>', '<user@host.name>'],
- [smtp.Address('someguy@someplace'), '<someguy@someplace>'],
- ['', '<>'],
- [smtp.Address(''), '<>'],
- ]
-
- for (c, e) in cases:
- self.assertEqual(smtp.quoteaddr(c), e)
-
- def testUser(self):
- u = smtp.User('user@host', 'helo.host.name', None, None)
- self.assertEqual(str(u), 'user@host')
-
- def testXtextEncoding(self):
- cases = [
- ('Hello world', 'Hello+20world'),
- ('Hello+world', 'Hello+2Bworld'),
- ('\0\1\2\3\4\5', '+00+01+02+03+04+05'),
- ('e=mc2@example.com', 'e+3Dmc2@example.com')
- ]
-
- for (case, expected) in cases:
- self.assertEqual(smtp.xtext_encode(case), (expected, len(case)))
- self.assertEqual(case.encode('xtext'), expected)
- self.assertEqual(
- smtp.xtext_decode(expected), (case, len(expected)))
- self.assertEqual(expected.decode('xtext'), case)
-
-
- def test_encodeWithErrors(self):
- """
- Specifying an error policy to C{unicode.encode} with the
- I{xtext} codec should produce the same result as not
- specifying the error policy.
- """
- text = u'Hello world'
- self.assertEqual(
- smtp.xtext_encode(text, 'strict'),
- (text.encode('xtext'), len(text)))
- self.assertEqual(
- text.encode('xtext', 'strict'),
- text.encode('xtext'))
-
-
- def test_decodeWithErrors(self):
- """
- Similar to L{test_encodeWithErrors}, but for C{str.decode}.
- """
- bytes = 'Hello world'
- self.assertEqual(
- smtp.xtext_decode(bytes, 'strict'),
- (bytes.decode('xtext'), len(bytes)))
- self.assertEqual(
- bytes.decode('xtext', 'strict'),
- bytes.decode('xtext'))
-
-
-
-class NoticeTLSClient(MyESMTPClient):
- tls = False
-
- def esmtpState_starttls(self, code, resp):
- MyESMTPClient.esmtpState_starttls(self, code, resp)
- self.tls = True
-
-class TLSTestCase(unittest.TestCase, LoopbackMixin):
- def testTLS(self):
- clientCTX = ClientTLSContext()
- serverCTX = ServerTLSContext()
-
- client = NoticeTLSClient(contextFactory=clientCTX)
- server = DummyESMTP(contextFactory=serverCTX)
-
- def check(ignored):
- self.assertEqual(client.tls, True)
- self.assertEqual(server.startedTLS, True)
-
- return self.loopback(server, client).addCallback(check)
-
-if ClientTLSContext is None:
- for case in (TLSTestCase,):
- case.skip = "OpenSSL not present"
-
-if not interfaces.IReactorSSL.providedBy(reactor):
- for case in (TLSTestCase,):
- case.skip = "Reactor doesn't support SSL"
-
-class EmptyLineTestCase(unittest.TestCase):
- def test_emptyLineSyntaxError(self):
- """
- If L{smtp.SMTP} receives an empty line, it responds with a 500 error
- response code and a message about a syntax error.
- """
- proto = smtp.SMTP()
- transport = StringTransport()
- proto.makeConnection(transport)
- proto.lineReceived('')
- proto.setTimeout(None)
-
- out = transport.value().splitlines()
- self.assertEqual(len(out), 2)
- self.failUnless(out[0].startswith('220'))
- self.assertEqual(out[1], "500 Error: bad syntax")
-
-
-
-class TimeoutTestCase(unittest.TestCase, LoopbackMixin):
- """
- Check that SMTP client factories correctly use the timeout.
- """
-
- def _timeoutTest(self, onDone, clientFactory):
- """
- Connect the clientFactory, and check the timeout on the request.
- """
- clock = task.Clock()
- client = clientFactory.buildProtocol(
- address.IPv4Address('TCP', 'example.net', 25))
- client.callLater = clock.callLater
- t = StringTransport()
- client.makeConnection(t)
- t.protocol = client
- def check(ign):
- self.assertEqual(clock.seconds(), 0.5)
- d = self.assertFailure(onDone, smtp.SMTPTimeoutError
- ).addCallback(check)
- # The first call should not trigger the timeout
- clock.advance(0.1)
- # But this one should
- clock.advance(0.4)
- return d
-
-
- def test_SMTPClient(self):
- """
- Test timeout for L{smtp.SMTPSenderFactory}: the response L{Deferred}
- should be errback with a L{smtp.SMTPTimeoutError}.
- """
- onDone = defer.Deferred()
- clientFactory = smtp.SMTPSenderFactory(
- 'source@address', 'recipient@address',
- StringIO("Message body"), onDone,
- retries=0, timeout=0.5)
- return self._timeoutTest(onDone, clientFactory)
-
-
- def test_ESMTPClient(self):
- """
- Test timeout for L{smtp.ESMTPSenderFactory}: the response L{Deferred}
- should be errback with a L{smtp.SMTPTimeoutError}.
- """
- onDone = defer.Deferred()
- clientFactory = smtp.ESMTPSenderFactory(
- 'username', 'password',
- 'source@address', 'recipient@address',
- StringIO("Message body"), onDone,
- retries=0, timeout=0.5)
- return self._timeoutTest(onDone, clientFactory)
-
-
- def test_resetTimeoutWhileSending(self):
- """
- The timeout is not allowed to expire after the server has accepted a
- DATA command and the client is actively sending data to it.
- """
- class SlowFile:
- """
- A file-like which returns one byte from each read call until the
- specified number of bytes have been returned.
- """
- def __init__(self, size):
- self._size = size
-
- def read(self, max=None):
- if self._size:
- self._size -= 1
- return 'x'
- return ''
-
- failed = []
- onDone = defer.Deferred()
- onDone.addErrback(failed.append)
- clientFactory = smtp.SMTPSenderFactory(
- 'source@address', 'recipient@address',
- SlowFile(1), onDone, retries=0, timeout=3)
- clientFactory.domain = "example.org"
- clock = task.Clock()
- client = clientFactory.buildProtocol(
- address.IPv4Address('TCP', 'example.net', 25))
- client.callLater = clock.callLater
- transport = StringTransport()
- client.makeConnection(transport)
-
- client.dataReceived(
- "220 Ok\r\n" # Greet the client
- "250 Ok\r\n" # Respond to HELO
- "250 Ok\r\n" # Respond to MAIL FROM
- "250 Ok\r\n" # Respond to RCPT TO
- "354 Ok\r\n" # Respond to DATA
- )
-
- # Now the client is producing data to the server. Any time
- # resumeProducing is called on the producer, the timeout should be
- # extended. First, a sanity check. This test is only written to
- # handle pull producers.
- self.assertNotIdentical(transport.producer, None)
- self.assertFalse(transport.streaming)
-
- # Now, allow 2 seconds (1 less than the timeout of 3 seconds) to
- # elapse.
- clock.advance(2)
-
- # The timeout has not expired, so the failure should not have happened.
- self.assertEqual(failed, [])
-
- # Let some bytes be produced, extending the timeout. Then advance the
- # clock some more and verify that the timeout still hasn't happened.
- transport.producer.resumeProducing()
- clock.advance(2)
- self.assertEqual(failed, [])
-
- # The file has been completely produced - the next resume producing
- # finishes the upload, successfully.
- transport.producer.resumeProducing()
- client.dataReceived("250 Ok\r\n")
- self.assertEqual(failed, [])
-
- # Verify that the client actually did send the things expected.
- self.assertEqual(
- transport.value(),
- "HELO example.org\r\n"
- "MAIL FROM:<source@address>\r\n"
- "RCPT TO:<recipient@address>\r\n"
- "DATA\r\n"
- "x\r\n"
- ".\r\n"
- # This RSET is just an implementation detail. It's nice, but this
- # test doesn't really care about it.
- "RSET\r\n")
-
-
-
-class MultipleDeliveryFactorySMTPServerFactory(protocol.ServerFactory):
- """
- L{MultipleDeliveryFactorySMTPServerFactory} creates SMTP server protocol
- instances with message delivery factory objects supplied to it. Each
- factory is used for one connection and then discarded. Factories are used
- in the order they are supplied.
- """
- def __init__(self, messageFactories):
- self._messageFactories = messageFactories
-
-
- def buildProtocol(self, addr):
- p = protocol.ServerFactory.buildProtocol(self, addr)
- p.delivery = SimpleDelivery(self._messageFactories.pop(0))
- return p
-
-
-
-class SMTPSenderFactoryRetryTestCase(unittest.TestCase):
- """
- Tests for the retry behavior of L{smtp.SMTPSenderFactory}.
- """
- def test_retryAfterDisconnect(self):
- """
- If the protocol created by L{SMTPSenderFactory} loses its connection
- before receiving confirmation of message delivery, it reconnects and
- tries to deliver the message again.
- """
- recipient = 'alice'
- message = "some message text"
- domain = DummyDomain([recipient])
-
- class CleanSMTP(smtp.SMTP):
- """
- An SMTP subclass which ensures that its transport will be
- disconnected before the test ends.
- """
- def makeConnection(innerSelf, transport):
- self.addCleanup(transport.loseConnection)
- smtp.SMTP.makeConnection(innerSelf, transport)
-
- # Create a server which will fail the first message deliver attempt to
- # it with a 500 and a disconnect, but which will accept a message
- # delivered over the 2nd connection to it.
- serverFactory = MultipleDeliveryFactorySMTPServerFactory([
- BrokenMessage,
- lambda user: DummyMessage(domain, user)])
- serverFactory.protocol = CleanSMTP
- serverPort = reactor.listenTCP(0, serverFactory, interface='127.0.0.1')
- serverHost = serverPort.getHost()
- self.addCleanup(serverPort.stopListening)
-
- # Set up a client to try to deliver a message to the above created
- # server.
- sentDeferred = defer.Deferred()
- clientFactory = smtp.SMTPSenderFactory(
- "bob@example.org", recipient + "@example.com",
- StringIO(message), sentDeferred)
- clientFactory.domain = "example.org"
- clientConnector = reactor.connectTCP(
- serverHost.host, serverHost.port, clientFactory)
- self.addCleanup(clientConnector.disconnect)
-
- def cbSent(ignored):
- """
- Verify that the message was successfully delivered and flush the
- error which caused the first attempt to fail.
- """
- self.assertEqual(
- domain.messages,
- {recipient: ["\n%s\n" % (message,)]})
- # Flush the RuntimeError that BrokenMessage caused to be logged.
- self.assertEqual(len(self.flushLoggedErrors(RuntimeError)), 1)
- sentDeferred.addCallback(cbSent)
- return sentDeferred
-
-
-
-class SingletonRealm(object):
- """
- Trivial realm implementation which is constructed with an interface and an
- avatar and returns that avatar when asked for that interface.
- """
- implements(IRealm)
-
- def __init__(self, interface, avatar):
- self.interface = interface
- self.avatar = avatar
-
-
- def requestAvatar(self, avatarId, mind, *interfaces):
- for iface in interfaces:
- if iface is self.interface:
- return iface, self.avatar, lambda: None
-
-
-
-class NotImplementedDelivery(object):
- """
- Non-implementation of L{smtp.IMessageDelivery} which only has methods which
- raise L{NotImplementedError}. Subclassed by various tests to provide the
- particular behavior being tested.
- """
- def validateFrom(self, helo, origin):
- raise NotImplementedError("This oughtn't be called in the course of this test.")
-
-
- def validateTo(self, user):
- raise NotImplementedError("This oughtn't be called in the course of this test.")
-
-
- def receivedHeader(self, helo, origin, recipients):
- raise NotImplementedError("This oughtn't be called in the course of this test.")
-
-
-
-class SMTPServerTestCase(unittest.TestCase):
- """
- Test various behaviors of L{twisted.mail.smtp.SMTP} and
- L{twisted.mail.smtp.ESMTP}.
- """
- def testSMTPGreetingHost(self, serverClass=smtp.SMTP):
- """
- Test that the specified hostname shows up in the SMTP server's
- greeting.
- """
- s = serverClass()
- s.host = "example.com"
- t = StringTransport()
- s.makeConnection(t)
- s.connectionLost(error.ConnectionDone())
- self.assertIn("example.com", t.value())
-
-
- def testSMTPGreetingNotExtended(self):
- """
- Test that the string "ESMTP" does not appear in the SMTP server's
- greeting since that string strongly suggests the presence of support
- for various SMTP extensions which are not supported by L{smtp.SMTP}.
- """
- s = smtp.SMTP()
- t = StringTransport()
- s.makeConnection(t)
- s.connectionLost(error.ConnectionDone())
- self.assertNotIn("ESMTP", t.value())
-
-
- def testESMTPGreetingHost(self):
- """
- Similar to testSMTPGreetingHost, but for the L{smtp.ESMTP} class.
- """
- self.testSMTPGreetingHost(smtp.ESMTP)
-
-
- def testESMTPGreetingExtended(self):
- """
- Test that the string "ESMTP" does appear in the ESMTP server's
- greeting since L{smtp.ESMTP} does support the SMTP extensions which
- that advertises to the client.
- """
- s = smtp.ESMTP()
- t = StringTransport()
- s.makeConnection(t)
- s.connectionLost(error.ConnectionDone())
- self.assertIn("ESMTP", t.value())
-
-
- def test_acceptSenderAddress(self):
- """
- Test that a C{MAIL FROM} command with an acceptable address is
- responded to with the correct success code.
- """
- class AcceptanceDelivery(NotImplementedDelivery):
- """
- Delivery object which accepts all senders as valid.
- """
- def validateFrom(self, helo, origin):
- return origin
-
- realm = SingletonRealm(smtp.IMessageDelivery, AcceptanceDelivery())
- portal = Portal(realm, [AllowAnonymousAccess()])
- proto = smtp.SMTP()
- proto.portal = portal
- trans = StringTransport()
- proto.makeConnection(trans)
-
- # Deal with the necessary preliminaries
- proto.dataReceived('HELO example.com\r\n')
- trans.clear()
-
- # Try to specify our sender address
- proto.dataReceived('MAIL FROM:<alice@example.com>\r\n')
-
- # Clean up the protocol before doing anything that might raise an
- # exception.
- proto.connectionLost(error.ConnectionLost())
-
- # Make sure that we received exactly the correct response
- self.assertEqual(
- trans.value(),
- '250 Sender address accepted\r\n')
-
-
- def test_deliveryRejectedSenderAddress(self):
- """
- Test that a C{MAIL FROM} command with an address rejected by a
- L{smtp.IMessageDelivery} instance is responded to with the correct
- error code.
- """
- class RejectionDelivery(NotImplementedDelivery):
- """
- Delivery object which rejects all senders as invalid.
- """
- def validateFrom(self, helo, origin):
- raise smtp.SMTPBadSender(origin)
-
- realm = SingletonRealm(smtp.IMessageDelivery, RejectionDelivery())
- portal = Portal(realm, [AllowAnonymousAccess()])
- proto = smtp.SMTP()
- proto.portal = portal
- trans = StringTransport()
- proto.makeConnection(trans)
-
- # Deal with the necessary preliminaries
- proto.dataReceived('HELO example.com\r\n')
- trans.clear()
-
- # Try to specify our sender address
- proto.dataReceived('MAIL FROM:<alice@example.com>\r\n')
-
- # Clean up the protocol before doing anything that might raise an
- # exception.
- proto.connectionLost(error.ConnectionLost())
-
- # Make sure that we received exactly the correct response
- self.assertEqual(
- trans.value(),
- '550 Cannot receive from specified address '
- '<alice@example.com>: Sender not acceptable\r\n')
-
-
- def test_portalRejectedSenderAddress(self):
- """
- Test that a C{MAIL FROM} command with an address rejected by an
- L{smtp.SMTP} instance's portal is responded to with the correct error
- code.
- """
- class DisallowAnonymousAccess(object):
- """
- Checker for L{IAnonymous} which rejects authentication attempts.
- """
- implements(ICredentialsChecker)
-
- credentialInterfaces = (IAnonymous,)
-
- def requestAvatarId(self, credentials):
- return defer.fail(UnauthorizedLogin())
-
- realm = SingletonRealm(smtp.IMessageDelivery, NotImplementedDelivery())
- portal = Portal(realm, [DisallowAnonymousAccess()])
- proto = smtp.SMTP()
- proto.portal = portal
- trans = StringTransport()
- proto.makeConnection(trans)
-
- # Deal with the necessary preliminaries
- proto.dataReceived('HELO example.com\r\n')
- trans.clear()
-
- # Try to specify our sender address
- proto.dataReceived('MAIL FROM:<alice@example.com>\r\n')
-
- # Clean up the protocol before doing anything that might raise an
- # exception.
- proto.connectionLost(error.ConnectionLost())
-
- # Make sure that we received exactly the correct response
- self.assertEqual(
- trans.value(),
- '550 Cannot receive from specified address '
- '<alice@example.com>: Sender not acceptable\r\n')
-
-
- def test_portalRejectedAnonymousSender(self):
- """
- Test that a C{MAIL FROM} command issued without first authenticating
- when a portal has been configured to disallow anonymous logins is
- responded to with the correct error code.
- """
- realm = SingletonRealm(smtp.IMessageDelivery, NotImplementedDelivery())
- portal = Portal(realm, [])
- proto = smtp.SMTP()
- proto.portal = portal
- trans = StringTransport()
- proto.makeConnection(trans)
-
- # Deal with the necessary preliminaries
- proto.dataReceived('HELO example.com\r\n')
- trans.clear()
-
- # Try to specify our sender address
- proto.dataReceived('MAIL FROM:<alice@example.com>\r\n')
-
- # Clean up the protocol before doing anything that might raise an
- # exception.
- proto.connectionLost(error.ConnectionLost())
-
- # Make sure that we received exactly the correct response
- self.assertEqual(
- trans.value(),
- '550 Cannot receive from specified address '
- '<alice@example.com>: Unauthenticated senders not allowed\r\n')
-
-
-
-class ESMTPAuthenticationTestCase(unittest.TestCase):
- def assertServerResponse(self, bytes, response):
- """
- Assert that when the given bytes are delivered to the ESMTP server
- instance, it responds with the indicated lines.
-
- @type bytes: str
- @type response: list of str
- """
- self.transport.clear()
- self.server.dataReceived(bytes)
- self.assertEqual(
- response,
- self.transport.value().splitlines())
-
-
- def assertServerAuthenticated(self, loginArgs, username="username", password="password"):
- """
- Assert that a login attempt has been made, that the credentials and
- interfaces passed to it are correct, and that when the login request
- is satisfied, a successful response is sent by the ESMTP server
- instance.
-
- @param loginArgs: A C{list} previously passed to L{portalFactory}.
- """
- d, credentials, mind, interfaces = loginArgs.pop()
- self.assertEqual(loginArgs, [])
- self.failUnless(twisted.cred.credentials.IUsernamePassword.providedBy(credentials))
- self.assertEqual(credentials.username, username)
- self.failUnless(credentials.checkPassword(password))
- self.assertIn(smtp.IMessageDeliveryFactory, interfaces)
- self.assertIn(smtp.IMessageDelivery, interfaces)
- d.callback((smtp.IMessageDeliveryFactory, None, lambda: None))
-
- self.assertEqual(
- ["235 Authentication successful."],
- self.transport.value().splitlines())
-
-
- def setUp(self):
- """
- Create an ESMTP instance attached to a StringTransport.
- """
- self.server = smtp.ESMTP({
- 'LOGIN': imap4.LOGINCredentials})
- self.server.host = 'localhost'
- self.transport = StringTransport(
- peerAddress=address.IPv4Address('TCP', '127.0.0.1', 12345))
- self.server.makeConnection(self.transport)
-
-
- def tearDown(self):
- """
- Disconnect the ESMTP instance to clean up its timeout DelayedCall.
- """
- self.server.connectionLost(error.ConnectionDone())
-
-
- def portalFactory(self, loginList):
- class DummyPortal:
- def login(self, credentials, mind, *interfaces):
- d = defer.Deferred()
- loginList.append((d, credentials, mind, interfaces))
- return d
- return DummyPortal()
-
-
- def test_authenticationCapabilityAdvertised(self):
- """
- Test that AUTH is advertised to clients which issue an EHLO command.
- """
- self.transport.clear()
- self.server.dataReceived('EHLO\r\n')
- responseLines = self.transport.value().splitlines()
- self.assertEqual(
- responseLines[0],
- "250-localhost Hello 127.0.0.1, nice to meet you")
- self.assertEqual(
- responseLines[1],
- "250 AUTH LOGIN")
- self.assertEqual(len(responseLines), 2)
-
-
- def test_plainAuthentication(self):
- """
- Test that the LOGIN authentication mechanism can be used
- """
- loginArgs = []
- self.server.portal = self.portalFactory(loginArgs)
-
- self.server.dataReceived('EHLO\r\n')
- self.transport.clear()
-
- self.assertServerResponse(
- 'AUTH LOGIN\r\n',
- ["334 " + "User Name\0".encode('base64').strip()])
-
- self.assertServerResponse(
- 'username'.encode('base64') + '\r\n',
- ["334 " + "Password\0".encode('base64').strip()])
-
- self.assertServerResponse(
- 'password'.encode('base64').strip() + '\r\n',
- [])
-
- self.assertServerAuthenticated(loginArgs)
-
-
- def test_plainAuthenticationEmptyPassword(self):
- """
- Test that giving an empty password for plain auth succeeds.
- """
- loginArgs = []
- self.server.portal = self.portalFactory(loginArgs)
-
- self.server.dataReceived('EHLO\r\n')
- self.transport.clear()
-
- self.assertServerResponse(
- 'AUTH LOGIN\r\n',
- ["334 " + "User Name\0".encode('base64').strip()])
-
- self.assertServerResponse(
- 'username'.encode('base64') + '\r\n',
- ["334 " + "Password\0".encode('base64').strip()])
-
- self.assertServerResponse('\r\n', [])
- self.assertServerAuthenticated(loginArgs, password='')
-
-
- def test_plainAuthenticationInitialResponse(self):
- """
- The response to the first challenge may be included on the AUTH command
- line. Test that this is also supported.
- """
- loginArgs = []
- self.server.portal = self.portalFactory(loginArgs)
-
- self.server.dataReceived('EHLO\r\n')
- self.transport.clear()
-
- self.assertServerResponse(
- 'AUTH LOGIN ' + "username".encode('base64').strip() + '\r\n',
- ["334 " + "Password\0".encode('base64').strip()])
-
- self.assertServerResponse(
- 'password'.encode('base64').strip() + '\r\n',
- [])
-
- self.assertServerAuthenticated(loginArgs)
-
-
- def test_abortAuthentication(self):
- """
- Test that a challenge/response sequence can be aborted by the client.
- """
- loginArgs = []
- self.server.portal = self.portalFactory(loginArgs)
-
- self.server.dataReceived('EHLO\r\n')
- self.server.dataReceived('AUTH LOGIN\r\n')
-
- self.assertServerResponse(
- '*\r\n',
- ['501 Authentication aborted'])
-
-
- def test_invalidBase64EncodedResponse(self):
- """
- Test that a response which is not properly Base64 encoded results in
- the appropriate error code.
- """
- loginArgs = []
- self.server.portal = self.portalFactory(loginArgs)
-
- self.server.dataReceived('EHLO\r\n')
- self.server.dataReceived('AUTH LOGIN\r\n')
-
- self.assertServerResponse(
- 'x\r\n',
- ['501 Syntax error in parameters or arguments'])
-
- self.assertEqual(loginArgs, [])
-
-
- def test_invalidBase64EncodedInitialResponse(self):
- """
- Like L{test_invalidBase64EncodedResponse} but for the case of an
- initial response included with the C{AUTH} command.
- """
- loginArgs = []
- self.server.portal = self.portalFactory(loginArgs)
-
- self.server.dataReceived('EHLO\r\n')
- self.assertServerResponse(
- 'AUTH LOGIN x\r\n',
- ['501 Syntax error in parameters or arguments'])
-
- self.assertEqual(loginArgs, [])
-
-
- def test_unexpectedLoginFailure(self):
- """
- If the L{Deferred} returned by L{Portal.login} fires with an
- exception of any type other than L{UnauthorizedLogin}, the exception
- is logged and the client is informed that the authentication attempt
- has failed.
- """
- loginArgs = []
- self.server.portal = self.portalFactory(loginArgs)
-
- self.server.dataReceived('EHLO\r\n')
- self.transport.clear()
-
- self.assertServerResponse(
- 'AUTH LOGIN ' + 'username'.encode('base64').strip() + '\r\n',
- ['334 ' + 'Password\0'.encode('base64').strip()])
- self.assertServerResponse(
- 'password'.encode('base64').strip() + '\r\n',
- [])
-
- d, credentials, mind, interfaces = loginArgs.pop()
- d.errback(RuntimeError("Something wrong with the server"))
-
- self.assertEqual(
- '451 Requested action aborted: local error in processing\r\n',
- self.transport.value())
-
- self.assertEqual(len(self.flushLoggedErrors(RuntimeError)), 1)
-
-
-
-class SMTPClientErrorTestCase(unittest.TestCase):
- """
- Tests for L{smtp.SMTPClientError}.
- """
- def test_str(self):
- """
- The string representation of a L{SMTPClientError} instance includes
- the response code and response string.
- """
- err = smtp.SMTPClientError(123, "some text")
- self.assertEqual(str(err), "123 some text")
-
-
- def test_strWithNegativeCode(self):
- """
- If the response code supplied to L{SMTPClientError} is negative, it
- is excluded from the string representation.
- """
- err = smtp.SMTPClientError(-1, "foo bar")
- self.assertEqual(str(err), "foo bar")
-
-
- def test_strWithLog(self):
- """
- If a line log is supplied to L{SMTPClientError}, its contents are
- included in the string representation of the exception instance.
- """
- log = LineLog(10)
- log.append("testlog")
- log.append("secondline")
- err = smtp.SMTPClientError(100, "test error", log=log.str())
- self.assertEqual(
- str(err),
- "100 test error\n"
- "testlog\n"
- "secondline\n")
-
-
-
-class SenderMixinSentMailTests(unittest.TestCase):
- """
- Tests for L{smtp.SenderMixin.sentMail}, used in particular by
- L{smtp.SMTPSenderFactory} and L{smtp.ESMTPSenderFactory}.
- """
- def test_onlyLogFailedAddresses(self):
- """
- L{smtp.SenderMixin.sentMail} adds only the addresses with failing
- SMTP response codes to the log passed to the factory's errback.
- """
- onDone = self.assertFailure(defer.Deferred(), smtp.SMTPDeliveryError)
- onDone.addCallback(lambda e: self.assertEqual(
- e.log, "bob@example.com: 199 Error in sending.\n"))
-
- clientFactory = smtp.SMTPSenderFactory(
- 'source@address', 'recipient@address',
- StringIO("Message body"), onDone,
- retries=0, timeout=0.5)
-
- client = clientFactory.buildProtocol(
- address.IPv4Address('TCP', 'example.net', 25))
-
- addresses = [("alice@example.com", 200, "No errors here!"),
- ("bob@example.com", 199, "Error in sending.")]
- client.sentMail(199, "Test response", 1, addresses, client.log)
-
- return onDone