aboutsummaryrefslogtreecommitdiffstats
path: root/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/mail/test/test_imap.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/mail/test/test_imap.py')
-rwxr-xr-xlib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/mail/test/test_imap.py4829
1 files changed, 0 insertions, 4829 deletions
diff --git a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/mail/test/test_imap.py b/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/mail/test/test_imap.py
deleted file mode 100755
index b2461ce1..00000000
--- a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/mail/test/test_imap.py
+++ /dev/null
@@ -1,4829 +0,0 @@
-# -*- test-case-name: twisted.mail.test.test_imap -*-
-# Copyright (c) Twisted Matrix Laboratories.
-# See LICENSE for details.
-
-
-"""
-Test case for twisted.mail.imap4
-"""
-
-try:
- from cStringIO import StringIO
-except ImportError:
- from StringIO import StringIO
-
-import codecs
-import locale
-import os
-import types
-
-from zope.interface import implements
-
-from twisted.mail.imap4 import MessageSet
-from twisted.mail import imap4
-from twisted.protocols import loopback
-from twisted.internet import defer
-from twisted.internet import error
-from twisted.internet import reactor
-from twisted.internet import interfaces
-from twisted.internet.task import Clock
-from twisted.trial import unittest
-from twisted.python import util
-from twisted.python import failure
-
-from twisted import cred
-import twisted.cred.error
-import twisted.cred.checkers
-import twisted.cred.credentials
-import twisted.cred.portal
-
-from twisted.test.proto_helpers import StringTransport, StringTransportWithDisconnection
-
-try:
- from twisted.test.ssl_helpers import ClientTLSContext, ServerTLSContext
-except ImportError:
- ClientTLSContext = ServerTLSContext = None
-
-def strip(f):
- return lambda result, f=f: f()
-
-def sortNest(l):
- l = l[:]
- l.sort()
- for i in range(len(l)):
- if isinstance(l[i], types.ListType):
- l[i] = sortNest(l[i])
- elif isinstance(l[i], types.TupleType):
- l[i] = tuple(sortNest(list(l[i])))
- return l
-
-class IMAP4UTF7TestCase(unittest.TestCase):
- tests = [
- [u'Hello world', 'Hello world'],
- [u'Hello & world', 'Hello &- world'],
- [u'Hello\xffworld', 'Hello&AP8-world'],
- [u'\xff\xfe\xfd\xfc', '&AP8A,gD9APw-'],
- [u'~peter/mail/\u65e5\u672c\u8a9e/\u53f0\u5317',
- '~peter/mail/&ZeVnLIqe-/&U,BTFw-'], # example from RFC 2060
- ]
-
- def test_encodeWithErrors(self):
- """
- Specifying an error policy to C{unicode.encode} with the
- I{imap4-utf-7} codec should produce the same result as not
- specifying the error policy.
- """
- text = u'Hello world'
- self.assertEqual(
- text.encode('imap4-utf-7', 'strict'),
- text.encode('imap4-utf-7'))
-
-
- def test_decodeWithErrors(self):
- """
- Similar to L{test_encodeWithErrors}, but for C{str.decode}.
- """
- bytes = 'Hello world'
- self.assertEqual(
- bytes.decode('imap4-utf-7', 'strict'),
- bytes.decode('imap4-utf-7'))
-
-
- def test_getreader(self):
- """
- C{codecs.getreader('imap4-utf-7')} returns the I{imap4-utf-7} stream
- reader class.
- """
- reader = codecs.getreader('imap4-utf-7')(StringIO('Hello&AP8-world'))
- self.assertEqual(reader.read(), u'Hello\xffworld')
-
-
- def test_getwriter(self):
- """
- C{codecs.getwriter('imap4-utf-7')} returns the I{imap4-utf-7} stream
- writer class.
- """
- output = StringIO()
- writer = codecs.getwriter('imap4-utf-7')(output)
- writer.write(u'Hello\xffworld')
- self.assertEqual(output.getvalue(), 'Hello&AP8-world')
-
-
- def test_encode(self):
- """
- The I{imap4-utf-7} can be used to encode a unicode string into a byte
- string according to the IMAP4 modified UTF-7 encoding rules.
- """
- for (input, output) in self.tests:
- self.assertEqual(input.encode('imap4-utf-7'), output)
-
-
- def test_decode(self):
- """
- The I{imap4-utf-7} can be used to decode a byte string into a unicode
- string according to the IMAP4 modified UTF-7 encoding rules.
- """
- for (input, output) in self.tests:
- self.assertEqual(input, output.decode('imap4-utf-7'))
-
-
- def test_printableSingletons(self):
- """
- The IMAP4 modified UTF-7 implementation encodes all printable
- characters which are in ASCII using the corresponding ASCII byte.
- """
- # All printables represent themselves
- for o in range(0x20, 0x26) + range(0x27, 0x7f):
- self.assertEqual(chr(o), chr(o).encode('imap4-utf-7'))
- self.assertEqual(chr(o), chr(o).decode('imap4-utf-7'))
- self.assertEqual('&'.encode('imap4-utf-7'), '&-')
- self.assertEqual('&-'.decode('imap4-utf-7'), '&')
-
-
-
-class BufferingConsumer:
- def __init__(self):
- self.buffer = []
-
- def write(self, bytes):
- self.buffer.append(bytes)
- if self.consumer:
- self.consumer.resumeProducing()
-
- def registerProducer(self, consumer, streaming):
- self.consumer = consumer
- self.consumer.resumeProducing()
-
- def unregisterProducer(self):
- self.consumer = None
-
-class MessageProducerTestCase(unittest.TestCase):
- def testSinglePart(self):
- body = 'This is body text. Rar.'
- headers = util.OrderedDict()
- headers['from'] = 'sender@host'
- headers['to'] = 'recipient@domain'
- headers['subject'] = 'booga booga boo'
- headers['content-type'] = 'text/plain'
-
- msg = FakeyMessage(headers, (), None, body, 123, None )
-
- c = BufferingConsumer()
- p = imap4.MessageProducer(msg)
- d = p.beginProducing(c)
-
- def cbProduced(result):
- self.assertIdentical(result, p)
- self.assertEqual(
- ''.join(c.buffer),
-
- '{119}\r\n'
- 'From: sender@host\r\n'
- 'To: recipient@domain\r\n'
- 'Subject: booga booga boo\r\n'
- 'Content-Type: text/plain\r\n'
- '\r\n'
- + body)
- return d.addCallback(cbProduced)
-
-
- def testSingleMultiPart(self):
- outerBody = ''
- innerBody = 'Contained body message text. Squarge.'
- headers = util.OrderedDict()
- headers['from'] = 'sender@host'
- headers['to'] = 'recipient@domain'
- headers['subject'] = 'booga booga boo'
- headers['content-type'] = 'multipart/alternative; boundary="xyz"'
-
- innerHeaders = util.OrderedDict()
- innerHeaders['subject'] = 'this is subject text'
- innerHeaders['content-type'] = 'text/plain'
- msg = FakeyMessage(headers, (), None, outerBody, 123,
- [FakeyMessage(innerHeaders, (), None, innerBody,
- None, None)],
- )
-
- c = BufferingConsumer()
- p = imap4.MessageProducer(msg)
- d = p.beginProducing(c)
-
- def cbProduced(result):
- self.failUnlessIdentical(result, p)
-
- self.assertEqual(
- ''.join(c.buffer),
-
- '{239}\r\n'
- 'From: sender@host\r\n'
- 'To: recipient@domain\r\n'
- 'Subject: booga booga boo\r\n'
- 'Content-Type: multipart/alternative; boundary="xyz"\r\n'
- '\r\n'
- '\r\n'
- '--xyz\r\n'
- 'Subject: this is subject text\r\n'
- 'Content-Type: text/plain\r\n'
- '\r\n'
- + innerBody
- + '\r\n--xyz--\r\n')
-
- return d.addCallback(cbProduced)
-
-
- def testMultipleMultiPart(self):
- outerBody = ''
- innerBody1 = 'Contained body message text. Squarge.'
- innerBody2 = 'Secondary <i>message</i> text of squarge body.'
- headers = util.OrderedDict()
- headers['from'] = 'sender@host'
- headers['to'] = 'recipient@domain'
- headers['subject'] = 'booga booga boo'
- headers['content-type'] = 'multipart/alternative; boundary="xyz"'
- innerHeaders = util.OrderedDict()
- innerHeaders['subject'] = 'this is subject text'
- innerHeaders['content-type'] = 'text/plain'
- innerHeaders2 = util.OrderedDict()
- innerHeaders2['subject'] = '<b>this is subject</b>'
- innerHeaders2['content-type'] = 'text/html'
- msg = FakeyMessage(headers, (), None, outerBody, 123, [
- FakeyMessage(innerHeaders, (), None, innerBody1, None, None),
- FakeyMessage(innerHeaders2, (), None, innerBody2, None, None)
- ],
- )
-
- c = BufferingConsumer()
- p = imap4.MessageProducer(msg)
- d = p.beginProducing(c)
-
- def cbProduced(result):
- self.failUnlessIdentical(result, p)
-
- self.assertEqual(
- ''.join(c.buffer),
-
- '{354}\r\n'
- 'From: sender@host\r\n'
- 'To: recipient@domain\r\n'
- 'Subject: booga booga boo\r\n'
- 'Content-Type: multipart/alternative; boundary="xyz"\r\n'
- '\r\n'
- '\r\n'
- '--xyz\r\n'
- 'Subject: this is subject text\r\n'
- 'Content-Type: text/plain\r\n'
- '\r\n'
- + innerBody1
- + '\r\n--xyz\r\n'
- 'Subject: <b>this is subject</b>\r\n'
- 'Content-Type: text/html\r\n'
- '\r\n'
- + innerBody2
- + '\r\n--xyz--\r\n')
- return d.addCallback(cbProduced)
-
-
-
-class IMAP4HelperTestCase(unittest.TestCase):
- """
- Tests for various helper utilities in the IMAP4 module.
- """
-
- def test_fileProducer(self):
- b = (('x' * 1) + ('y' * 1) + ('z' * 1)) * 10
- c = BufferingConsumer()
- f = StringIO(b)
- p = imap4.FileProducer(f)
- d = p.beginProducing(c)
-
- def cbProduced(result):
- self.failUnlessIdentical(result, p)
- self.assertEqual(
- ('{%d}\r\n' % len(b))+ b,
- ''.join(c.buffer))
- return d.addCallback(cbProduced)
-
-
- def test_wildcard(self):
- cases = [
- ['foo/%gum/bar',
- ['foo/bar', 'oo/lalagum/bar', 'foo/gumx/bar', 'foo/gum/baz'],
- ['foo/xgum/bar', 'foo/gum/bar'],
- ], ['foo/x%x/bar',
- ['foo', 'bar', 'fuz fuz fuz', 'foo/*/bar', 'foo/xyz/bar', 'foo/xx/baz'],
- ['foo/xyx/bar', 'foo/xx/bar', 'foo/xxxxxxxxxxxxxx/bar'],
- ], ['foo/xyz*abc/bar',
- ['foo/xyz/bar', 'foo/abc/bar', 'foo/xyzab/cbar', 'foo/xyza/bcbar'],
- ['foo/xyzabc/bar', 'foo/xyz/abc/bar', 'foo/xyz/123/abc/bar'],
- ]
- ]
-
- for (wildcard, fail, succeed) in cases:
- wildcard = imap4.wildcardToRegexp(wildcard, '/')
- for x in fail:
- self.failIf(wildcard.match(x))
- for x in succeed:
- self.failUnless(wildcard.match(x))
-
-
- def test_wildcardNoDelim(self):
- cases = [
- ['foo/%gum/bar',
- ['foo/bar', 'oo/lalagum/bar', 'foo/gumx/bar', 'foo/gum/baz'],
- ['foo/xgum/bar', 'foo/gum/bar', 'foo/x/gum/bar'],
- ], ['foo/x%x/bar',
- ['foo', 'bar', 'fuz fuz fuz', 'foo/*/bar', 'foo/xyz/bar', 'foo/xx/baz'],
- ['foo/xyx/bar', 'foo/xx/bar', 'foo/xxxxxxxxxxxxxx/bar', 'foo/x/x/bar'],
- ], ['foo/xyz*abc/bar',
- ['foo/xyz/bar', 'foo/abc/bar', 'foo/xyzab/cbar', 'foo/xyza/bcbar'],
- ['foo/xyzabc/bar', 'foo/xyz/abc/bar', 'foo/xyz/123/abc/bar'],
- ]
- ]
-
- for (wildcard, fail, succeed) in cases:
- wildcard = imap4.wildcardToRegexp(wildcard, None)
- for x in fail:
- self.failIf(wildcard.match(x), x)
- for x in succeed:
- self.failUnless(wildcard.match(x), x)
-
-
- def test_headerFormatter(self):
- """
- L{imap4._formatHeaders} accepts a C{dict} of header name/value pairs and
- returns a string representing those headers in the standard multiline,
- C{":"}-separated format.
- """
- cases = [
- ({'Header1': 'Value1', 'Header2': 'Value2'}, 'Header2: Value2\r\nHeader1: Value1\r\n'),
- ]
-
- for (input, expected) in cases:
- output = imap4._formatHeaders(input)
- self.assertEqual(sorted(output.splitlines(True)),
- sorted(expected.splitlines(True)))
-
-
- def test_messageSet(self):
- m1 = MessageSet()
- m2 = MessageSet()
-
- self.assertEqual(m1, m2)
-
- m1 = m1 + (1, 3)
- self.assertEqual(len(m1), 3)
- self.assertEqual(list(m1), [1, 2, 3])
-
- m2 = m2 + (1, 3)
- self.assertEqual(m1, m2)
- self.assertEqual(list(m1 + m2), [1, 2, 3])
-
-
- def test_messageSetStringRepresentationWithWildcards(self):
- """
- In a L{MessageSet}, in the presence of wildcards, if the highest message
- id is known, the wildcard should get replaced by that high value.
- """
- inputs = [
- MessageSet(imap4.parseIdList('*')),
- MessageSet(imap4.parseIdList('3:*', 6)),
- MessageSet(imap4.parseIdList('*:2', 6)),
- ]
-
- outputs = [
- "*",
- "3:6",
- "2:6",
- ]
-
- for i, o in zip(inputs, outputs):
- self.assertEqual(str(i), o)
-
-
- def test_messageSetStringRepresentationWithInversion(self):
- """
- In a L{MessageSet}, inverting the high and low numbers in a range
- doesn't affect the meaning of the range. For example, 3:2 displays just
- like 2:3, because according to the RFC they have the same meaning.
- """
- inputs = [
- MessageSet(imap4.parseIdList('2:3')),
- MessageSet(imap4.parseIdList('3:2')),
- ]
-
- outputs = [
- "2:3",
- "2:3",
- ]
-
- for i, o in zip(inputs, outputs):
- self.assertEqual(str(i), o)
-
-
- def test_quotedSplitter(self):
- cases = [
- '''Hello World''',
- '''Hello "World!"''',
- '''World "Hello" "How are you?"''',
- '''"Hello world" How "are you?"''',
- '''foo bar "baz buz" NIL''',
- '''foo bar "baz buz" "NIL"''',
- '''foo NIL "baz buz" bar''',
- '''foo "NIL" "baz buz" bar''',
- '''"NIL" bar "baz buz" foo''',
- 'oo \\"oo\\" oo',
- '"oo \\"oo\\" oo"',
- 'oo \t oo',
- '"oo \t oo"',
- 'oo \\t oo',
- '"oo \\t oo"',
- 'oo \o oo',
- '"oo \o oo"',
- 'oo \\o oo',
- '"oo \\o oo"',
- ]
-
- answers = [
- ['Hello', 'World'],
- ['Hello', 'World!'],
- ['World', 'Hello', 'How are you?'],
- ['Hello world', 'How', 'are you?'],
- ['foo', 'bar', 'baz buz', None],
- ['foo', 'bar', 'baz buz', 'NIL'],
- ['foo', None, 'baz buz', 'bar'],
- ['foo', 'NIL', 'baz buz', 'bar'],
- ['NIL', 'bar', 'baz buz', 'foo'],
- ['oo', '"oo"', 'oo'],
- ['oo "oo" oo'],
- ['oo', 'oo'],
- ['oo \t oo'],
- ['oo', '\\t', 'oo'],
- ['oo \\t oo'],
- ['oo', '\o', 'oo'],
- ['oo \o oo'],
- ['oo', '\\o', 'oo'],
- ['oo \\o oo'],
-
- ]
-
- errors = [
- '"mismatched quote',
- 'mismatched quote"',
- 'mismatched"quote',
- '"oops here is" another"',
- ]
-
- for s in errors:
- self.assertRaises(imap4.MismatchedQuoting, imap4.splitQuoted, s)
-
- for (case, expected) in zip(cases, answers):
- self.assertEqual(imap4.splitQuoted(case), expected)
-
-
- def test_stringCollapser(self):
- cases = [
- ['a', 'b', 'c', 'd', 'e'],
- ['a', ' ', '"', 'b', 'c', ' ', '"', ' ', 'd', 'e'],
- [['a', 'b', 'c'], 'd', 'e'],
- ['a', ['b', 'c', 'd'], 'e'],
- ['a', 'b', ['c', 'd', 'e']],
- ['"', 'a', ' ', '"', ['b', 'c', 'd'], '"', ' ', 'e', '"'],
- ['a', ['"', ' ', 'b', 'c', ' ', ' ', '"'], 'd', 'e'],
- ]
-
- answers = [
- ['abcde'],
- ['a', 'bc ', 'de'],
- [['abc'], 'de'],
- ['a', ['bcd'], 'e'],
- ['ab', ['cde']],
- ['a ', ['bcd'], ' e'],
- ['a', [' bc '], 'de'],
- ]
-
- for (case, expected) in zip(cases, answers):
- self.assertEqual(imap4.collapseStrings(case), expected)
-
-
- def test_parenParser(self):
- s = '\r\n'.join(['xx'] * 4)
- cases = [
- '(BODY.PEEK[HEADER.FIELDS.NOT (subject bcc cc)] {%d}\r\n%s)' % (len(s), s,),
-
-# '(FLAGS (\Seen) INTERNALDATE "17-Jul-1996 02:44:25 -0700" '
-# 'RFC822.SIZE 4286 ENVELOPE ("Wed, 17 Jul 1996 02:23:25 -0700 (PDT)" '
-# '"IMAP4rev1 WG mtg summary and minutes" '
-# '(("Terry Gray" NIL "gray" "cac.washington.edu")) '
-# '(("Terry Gray" NIL "gray" "cac.washington.edu")) '
-# '(("Terry Gray" NIL "gray" "cac.washington.edu")) '
-# '((NIL NIL "imap" "cac.washington.edu")) '
-# '((NIL NIL "minutes" "CNRI.Reston.VA.US") '
-# '("John Klensin" NIL "KLENSIN" "INFOODS.MIT.EDU")) NIL NIL '
-# '"<B27397-0100000@cac.washington.edu>") '
-# 'BODY ("TEXT" "PLAIN" ("CHARSET" "US-ASCII") NIL NIL "7BIT" 3028 92))',
-
- '(FLAGS (\Seen) INTERNALDATE "17-Jul-1996 02:44:25 -0700" '
- 'RFC822.SIZE 4286 ENVELOPE ("Wed, 17 Jul 1996 02:23:25 -0700 (PDT)" '
- '"IMAP4rev1 WG mtg summary and minutes" '
- '(("Terry Gray" NIL gray cac.washington.edu)) '
- '(("Terry Gray" NIL gray cac.washington.edu)) '
- '(("Terry Gray" NIL gray cac.washington.edu)) '
- '((NIL NIL imap cac.washington.edu)) '
- '((NIL NIL minutes CNRI.Reston.VA.US) '
- '("John Klensin" NIL KLENSIN INFOODS.MIT.EDU)) NIL NIL '
- '<B27397-0100000@cac.washington.edu>) '
- 'BODY (TEXT PLAIN (CHARSET US-ASCII) NIL NIL 7BIT 3028 92))',
- '("oo \\"oo\\" oo")',
- '("oo \\\\ oo")',
- '("oo \\ oo")',
- '("oo \\o")',
- '("oo \o")',
- '(oo \o)',
- '(oo \\o)',
-
- ]
-
- answers = [
- ['BODY.PEEK', ['HEADER.FIELDS.NOT', ['subject', 'bcc', 'cc']], s],
-
- ['FLAGS', [r'\Seen'], 'INTERNALDATE',
- '17-Jul-1996 02:44:25 -0700', 'RFC822.SIZE', '4286', 'ENVELOPE',
- ['Wed, 17 Jul 1996 02:23:25 -0700 (PDT)',
- 'IMAP4rev1 WG mtg summary and minutes', [["Terry Gray", None,
- "gray", "cac.washington.edu"]], [["Terry Gray", None,
- "gray", "cac.washington.edu"]], [["Terry Gray", None,
- "gray", "cac.washington.edu"]], [[None, None, "imap",
- "cac.washington.edu"]], [[None, None, "minutes",
- "CNRI.Reston.VA.US"], ["John Klensin", None, "KLENSIN",
- "INFOODS.MIT.EDU"]], None, None,
- "<B27397-0100000@cac.washington.edu>"], "BODY", ["TEXT", "PLAIN",
- ["CHARSET", "US-ASCII"], None, None, "7BIT", "3028", "92"]],
- ['oo "oo" oo'],
- ['oo \\\\ oo'],
- ['oo \\ oo'],
- ['oo \\o'],
- ['oo \o'],
- ['oo', '\o'],
- ['oo', '\\o'],
- ]
-
- for (case, expected) in zip(cases, answers):
- self.assertEqual(imap4.parseNestedParens(case), [expected])
-
- # XXX This code used to work, but changes occurred within the
- # imap4.py module which made it no longer necessary for *all* of it
- # to work. In particular, only the part that makes
- # 'BODY.PEEK[HEADER.FIELDS.NOT (Subject Bcc Cc)]' come out correctly
- # no longer needs to work. So, I am loathe to delete the entire
- # section of the test. --exarkun
- #
-
-# for (case, expected) in zip(answers, cases):
-# self.assertEqual('(' + imap4.collapseNestedLists(case) + ')', expected)
-
-
- def test_fetchParserSimple(self):
- cases = [
- ['ENVELOPE', 'Envelope'],
- ['FLAGS', 'Flags'],
- ['INTERNALDATE', 'InternalDate'],
- ['RFC822.HEADER', 'RFC822Header'],
- ['RFC822.SIZE', 'RFC822Size'],
- ['RFC822.TEXT', 'RFC822Text'],
- ['RFC822', 'RFC822'],
- ['UID', 'UID'],
- ['BODYSTRUCTURE', 'BodyStructure'],
- ]
-
- for (inp, outp) in cases:
- p = imap4._FetchParser()
- p.parseString(inp)
- self.assertEqual(len(p.result), 1)
- self.failUnless(isinstance(p.result[0], getattr(p, outp)))
-
-
- def test_fetchParserMacros(self):
- cases = [
- ['ALL', (4, ['flags', 'internaldate', 'rfc822.size', 'envelope'])],
- ['FULL', (5, ['flags', 'internaldate', 'rfc822.size', 'envelope', 'body'])],
- ['FAST', (3, ['flags', 'internaldate', 'rfc822.size'])],
- ]
-
- for (inp, outp) in cases:
- p = imap4._FetchParser()
- p.parseString(inp)
- self.assertEqual(len(p.result), outp[0])
- p = [str(p).lower() for p in p.result]
- p.sort()
- outp[1].sort()
- self.assertEqual(p, outp[1])
-
-
- def test_fetchParserBody(self):
- P = imap4._FetchParser
-
- p = P()
- p.parseString('BODY')
- self.assertEqual(len(p.result), 1)
- self.failUnless(isinstance(p.result[0], p.Body))
- self.assertEqual(p.result[0].peek, False)
- self.assertEqual(p.result[0].header, None)
- self.assertEqual(str(p.result[0]), 'BODY')
-
- p = P()
- p.parseString('BODY.PEEK')
- self.assertEqual(len(p.result), 1)
- self.failUnless(isinstance(p.result[0], p.Body))
- self.assertEqual(p.result[0].peek, True)
- self.assertEqual(str(p.result[0]), 'BODY')
-
- p = P()
- p.parseString('BODY[]')
- self.assertEqual(len(p.result), 1)
- self.failUnless(isinstance(p.result[0], p.Body))
- self.assertEqual(p.result[0].empty, True)
- self.assertEqual(str(p.result[0]), 'BODY[]')
-
- p = P()
- p.parseString('BODY[HEADER]')
- self.assertEqual(len(p.result), 1)
- self.failUnless(isinstance(p.result[0], p.Body))
- self.assertEqual(p.result[0].peek, False)
- self.failUnless(isinstance(p.result[0].header, p.Header))
- self.assertEqual(p.result[0].header.negate, True)
- self.assertEqual(p.result[0].header.fields, ())
- self.assertEqual(p.result[0].empty, False)
- self.assertEqual(str(p.result[0]), 'BODY[HEADER]')
-
- p = P()
- p.parseString('BODY.PEEK[HEADER]')
- self.assertEqual(len(p.result), 1)
- self.failUnless(isinstance(p.result[0], p.Body))
- self.assertEqual(p.result[0].peek, True)
- self.failUnless(isinstance(p.result[0].header, p.Header))
- self.assertEqual(p.result[0].header.negate, True)
- self.assertEqual(p.result[0].header.fields, ())
- self.assertEqual(p.result[0].empty, False)
- self.assertEqual(str(p.result[0]), 'BODY[HEADER]')
-
- p = P()
- p.parseString('BODY[HEADER.FIELDS (Subject Cc Message-Id)]')
- self.assertEqual(len(p.result), 1)
- self.failUnless(isinstance(p.result[0], p.Body))
- self.assertEqual(p.result[0].peek, False)
- self.failUnless(isinstance(p.result[0].header, p.Header))
- self.assertEqual(p.result[0].header.negate, False)
- self.assertEqual(p.result[0].header.fields, ['SUBJECT', 'CC', 'MESSAGE-ID'])
- self.assertEqual(p.result[0].empty, False)
- self.assertEqual(str(p.result[0]), 'BODY[HEADER.FIELDS (Subject Cc Message-Id)]')
-
- p = P()
- p.parseString('BODY.PEEK[HEADER.FIELDS (Subject Cc Message-Id)]')
- self.assertEqual(len(p.result), 1)
- self.failUnless(isinstance(p.result[0], p.Body))
- self.assertEqual(p.result[0].peek, True)
- self.failUnless(isinstance(p.result[0].header, p.Header))
- self.assertEqual(p.result[0].header.negate, False)
- self.assertEqual(p.result[0].header.fields, ['SUBJECT', 'CC', 'MESSAGE-ID'])
- self.assertEqual(p.result[0].empty, False)
- self.assertEqual(str(p.result[0]), 'BODY[HEADER.FIELDS (Subject Cc Message-Id)]')
-
- p = P()
- p.parseString('BODY.PEEK[HEADER.FIELDS.NOT (Subject Cc Message-Id)]')
- self.assertEqual(len(p.result), 1)
- self.failUnless(isinstance(p.result[0], p.Body))
- self.assertEqual(p.result[0].peek, True)
- self.failUnless(isinstance(p.result[0].header, p.Header))
- self.assertEqual(p.result[0].header.negate, True)
- self.assertEqual(p.result[0].header.fields, ['SUBJECT', 'CC', 'MESSAGE-ID'])
- self.assertEqual(p.result[0].empty, False)
- self.assertEqual(str(p.result[0]), 'BODY[HEADER.FIELDS.NOT (Subject Cc Message-Id)]')
-
- p = P()
- p.parseString('BODY[1.MIME]<10.50>')
- self.assertEqual(len(p.result), 1)
- self.failUnless(isinstance(p.result[0], p.Body))
- self.assertEqual(p.result[0].peek, False)
- self.failUnless(isinstance(p.result[0].mime, p.MIME))
- self.assertEqual(p.result[0].part, (0,))
- self.assertEqual(p.result[0].partialBegin, 10)
- self.assertEqual(p.result[0].partialLength, 50)
- self.assertEqual(p.result[0].empty, False)
- self.assertEqual(str(p.result[0]), 'BODY[1.MIME]<10.50>')
-
- p = P()
- p.parseString('BODY.PEEK[1.3.9.11.HEADER.FIELDS.NOT (Message-Id Date)]<103.69>')
- self.assertEqual(len(p.result), 1)
- self.failUnless(isinstance(p.result[0], p.Body))
- self.assertEqual(p.result[0].peek, True)
- self.failUnless(isinstance(p.result[0].header, p.Header))
- self.assertEqual(p.result[0].part, (0, 2, 8, 10))
- self.assertEqual(p.result[0].header.fields, ['MESSAGE-ID', 'DATE'])
- self.assertEqual(p.result[0].partialBegin, 103)
- self.assertEqual(p.result[0].partialLength, 69)
- self.assertEqual(p.result[0].empty, False)
- self.assertEqual(str(p.result[0]), 'BODY[1.3.9.11.HEADER.FIELDS.NOT (Message-Id Date)]<103.69>')
-
-
- def test_files(self):
- inputStructure = [
- 'foo', 'bar', 'baz', StringIO('this is a file\r\n'), 'buz'
- ]
-
- output = '"foo" "bar" "baz" {16}\r\nthis is a file\r\n "buz"'
-
- self.assertEqual(imap4.collapseNestedLists(inputStructure), output)
-
-
- def test_quoteAvoider(self):
- input = [
- 'foo', imap4.DontQuoteMe('bar'), "baz", StringIO('this is a file\r\n'),
- imap4.DontQuoteMe('buz'), ""
- ]
-
- output = '"foo" bar "baz" {16}\r\nthis is a file\r\n buz ""'
-
- self.assertEqual(imap4.collapseNestedLists(input), output)
-
-
- def test_literals(self):
- cases = [
- ('({10}\r\n0123456789)', [['0123456789']]),
- ]
-
- for (case, expected) in cases:
- self.assertEqual(imap4.parseNestedParens(case), expected)
-
-
- def test_queryBuilder(self):
- inputs = [
- imap4.Query(flagged=1),
- imap4.Query(sorted=1, unflagged=1, deleted=1),
- imap4.Or(imap4.Query(flagged=1), imap4.Query(deleted=1)),
- imap4.Query(before='today'),
- imap4.Or(
- imap4.Query(deleted=1),
- imap4.Query(unseen=1),
- imap4.Query(new=1)
- ),
- imap4.Or(
- imap4.Not(
- imap4.Or(
- imap4.Query(sorted=1, since='yesterday', smaller=1000),
- imap4.Query(sorted=1, before='tuesday', larger=10000),
- imap4.Query(sorted=1, unseen=1, deleted=1, before='today'),
- imap4.Not(
- imap4.Query(subject='spam')
- ),
- ),
- ),
- imap4.Not(
- imap4.Query(uid='1:5')
- ),
- )
- ]
-
- outputs = [
- 'FLAGGED',
- '(DELETED UNFLAGGED)',
- '(OR FLAGGED DELETED)',
- '(BEFORE "today")',
- '(OR DELETED (OR UNSEEN NEW))',
- '(OR (NOT (OR (SINCE "yesterday" SMALLER 1000) ' # Continuing
- '(OR (BEFORE "tuesday" LARGER 10000) (OR (BEFORE ' # Some more
- '"today" DELETED UNSEEN) (NOT (SUBJECT "spam")))))) ' # And more
- '(NOT (UID 1:5)))',
- ]
-
- for (query, expected) in zip(inputs, outputs):
- self.assertEqual(query, expected)
-
-
- def test_invalidIdListParser(self):
- """
- Trying to parse an invalid representation of a sequence range raises an
- L{IllegalIdentifierError}.
- """
- inputs = [
- '*:*',
- 'foo',
- '4:',
- 'bar:5'
- ]
-
- for input in inputs:
- self.assertRaises(imap4.IllegalIdentifierError,
- imap4.parseIdList, input, 12345)
-
-
- def test_invalidIdListParserNonPositive(self):
- """
- Zeroes and negative values are not accepted in id range expressions. RFC
- 3501 states that sequence numbers and sequence ranges consist of
- non-negative numbers (RFC 3501 section 9, the seq-number grammar item).
- """
- inputs = [
- '0:5',
- '0:0',
- '*:0',
- '0',
- '-3:5',
- '1:-2',
- '-1'
- ]
-
- for input in inputs:
- self.assertRaises(imap4.IllegalIdentifierError,
- imap4.parseIdList, input, 12345)
-
-
- def test_parseIdList(self):
- """
- The function to parse sequence ranges yields appropriate L{MessageSet}
- objects.
- """
- inputs = [
- '1:*',
- '5:*',
- '1:2,5:*',
- '*',
- '1',
- '1,2',
- '1,3,5',
- '1:10',
- '1:10,11',
- '1:5,10:20',
- '1,5:10',
- '1,5:10,15:20',
- '1:10,15,20:25',
- '4:2'
- ]
-
- outputs = [
- MessageSet(1, None),
- MessageSet(5, None),
- MessageSet(5, None) + MessageSet(1, 2),
- MessageSet(None, None),
- MessageSet(1),
- MessageSet(1, 2),
- MessageSet(1) + MessageSet(3) + MessageSet(5),
- MessageSet(1, 10),
- MessageSet(1, 11),
- MessageSet(1, 5) + MessageSet(10, 20),
- MessageSet(1) + MessageSet(5, 10),
- MessageSet(1) + MessageSet(5, 10) + MessageSet(15, 20),
- MessageSet(1, 10) + MessageSet(15) + MessageSet(20, 25),
- MessageSet(2, 4),
- ]
-
- lengths = [
- None, None, None,
- 1, 1, 2, 3, 10, 11, 16, 7, 13, 17, 3
- ]
-
- for (input, expected) in zip(inputs, outputs):
- self.assertEqual(imap4.parseIdList(input), expected)
-
- for (input, expected) in zip(inputs, lengths):
- if expected is None:
- self.assertRaises(TypeError, len, imap4.parseIdList(input))
- else:
- L = len(imap4.parseIdList(input))
- self.assertEqual(L, expected,
- "len(%r) = %r != %r" % (input, L, expected))
-
-class SimpleMailbox:
- implements(imap4.IMailboxInfo, imap4.IMailbox, imap4.ICloseableMailbox)
-
- flags = ('\\Flag1', 'Flag2', '\\AnotherSysFlag', 'LastFlag')
- messages = []
- mUID = 0
- rw = 1
- closed = False
-
- def __init__(self):
- self.listeners = []
- self.addListener = self.listeners.append
- self.removeListener = self.listeners.remove
-
- def getFlags(self):
- return self.flags
-
- def getUIDValidity(self):
- return 42
-
- def getUIDNext(self):
- return len(self.messages) + 1
-
- def getMessageCount(self):
- return 9
-
- def getRecentCount(self):
- return 3
-
- def getUnseenCount(self):
- return 4
-
- def isWriteable(self):
- return self.rw
-
- def destroy(self):
- pass
-
- def getHierarchicalDelimiter(self):
- return '/'
-
- def requestStatus(self, names):
- r = {}
- if 'MESSAGES' in names:
- r['MESSAGES'] = self.getMessageCount()
- if 'RECENT' in names:
- r['RECENT'] = self.getRecentCount()
- if 'UIDNEXT' in names:
- r['UIDNEXT'] = self.getMessageCount() + 1
- if 'UIDVALIDITY' in names:
- r['UIDVALIDITY'] = self.getUID()
- if 'UNSEEN' in names:
- r['UNSEEN'] = self.getUnseenCount()
- return defer.succeed(r)
-
- def addMessage(self, message, flags, date = None):
- self.messages.append((message, flags, date, self.mUID))
- self.mUID += 1
- return defer.succeed(None)
-
- def expunge(self):
- delete = []
- for i in self.messages:
- if '\\Deleted' in i[1]:
- delete.append(i)
- for i in delete:
- self.messages.remove(i)
- return [i[3] for i in delete]
-
- def close(self):
- self.closed = True
-
-class Account(imap4.MemoryAccount):
- mailboxFactory = SimpleMailbox
- def _emptyMailbox(self, name, id):
- return self.mailboxFactory()
-
- def select(self, name, rw=1):
- mbox = imap4.MemoryAccount.select(self, name)
- if mbox is not None:
- mbox.rw = rw
- return mbox
-
-class SimpleServer(imap4.IMAP4Server):
- def __init__(self, *args, **kw):
- imap4.IMAP4Server.__init__(self, *args, **kw)
- realm = TestRealm()
- realm.theAccount = Account('testuser')
- portal = cred.portal.Portal(realm)
- c = cred.checkers.InMemoryUsernamePasswordDatabaseDontUse()
- self.checker = c
- self.portal = portal
- portal.registerChecker(c)
- self.timeoutTest = False
-
- def lineReceived(self, line):
- if self.timeoutTest:
- #Do not send a respones
- return
-
- imap4.IMAP4Server.lineReceived(self, line)
-
- _username = 'testuser'
- _password = 'password-test'
- def authenticateLogin(self, username, password):
- if username == self._username and password == self._password:
- return imap4.IAccount, self.theAccount, lambda: None
- raise cred.error.UnauthorizedLogin()
-
-
-class SimpleClient(imap4.IMAP4Client):
- def __init__(self, deferred, contextFactory = None):
- imap4.IMAP4Client.__init__(self, contextFactory)
- self.deferred = deferred
- self.events = []
-
- def serverGreeting(self, caps):
- self.deferred.callback(None)
-
- def modeChanged(self, writeable):
- self.events.append(['modeChanged', writeable])
- self.transport.loseConnection()
-
- def flagsChanged(self, newFlags):
- self.events.append(['flagsChanged', newFlags])
- self.transport.loseConnection()
-
- def newMessages(self, exists, recent):
- self.events.append(['newMessages', exists, recent])
- self.transport.loseConnection()
-
-
-
-class IMAP4HelperMixin:
-
- serverCTX = None
- clientCTX = None
-
- def setUp(self):
- d = defer.Deferred()
- self.server = SimpleServer(contextFactory=self.serverCTX)
- self.client = SimpleClient(d, contextFactory=self.clientCTX)
- self.connected = d
-
- SimpleMailbox.messages = []
- theAccount = Account('testuser')
- theAccount.mboxType = SimpleMailbox
- SimpleServer.theAccount = theAccount
-
- def tearDown(self):
- del self.server
- del self.client
- del self.connected
-
- def _cbStopClient(self, ignore):
- self.client.transport.loseConnection()
-
- def _ebGeneral(self, failure):
- self.client.transport.loseConnection()
- self.server.transport.loseConnection()
- failure.raiseException()
-
- def loopback(self):
- return loopback.loopbackAsync(self.server, self.client)
-
-class IMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):
- def testCapability(self):
- caps = {}
- def getCaps():
- def gotCaps(c):
- caps.update(c)
- self.server.transport.loseConnection()
- return self.client.getCapabilities().addCallback(gotCaps)
- d1 = self.connected.addCallback(strip(getCaps)).addErrback(self._ebGeneral)
- d = defer.gatherResults([self.loopback(), d1])
- expected = {'IMAP4rev1': None, 'NAMESPACE': None, 'IDLE': None}
- return d.addCallback(lambda _: self.assertEqual(expected, caps))
-
- def testCapabilityWithAuth(self):
- caps = {}
- self.server.challengers['CRAM-MD5'] = cred.credentials.CramMD5Credentials
- def getCaps():
- def gotCaps(c):
- caps.update(c)
- self.server.transport.loseConnection()
- return self.client.getCapabilities().addCallback(gotCaps)
- d1 = self.connected.addCallback(strip(getCaps)).addErrback(self._ebGeneral)
- d = defer.gatherResults([self.loopback(), d1])
-
- expCap = {'IMAP4rev1': None, 'NAMESPACE': None,
- 'IDLE': None, 'AUTH': ['CRAM-MD5']}
-
- return d.addCallback(lambda _: self.assertEqual(expCap, caps))
-
- def testLogout(self):
- self.loggedOut = 0
- def logout():
- def setLoggedOut():
- self.loggedOut = 1
- self.client.logout().addCallback(strip(setLoggedOut))
- self.connected.addCallback(strip(logout)).addErrback(self._ebGeneral)
- d = self.loopback()
- return d.addCallback(lambda _: self.assertEqual(self.loggedOut, 1))
-
- def testNoop(self):
- self.responses = None
- def noop():
- def setResponses(responses):
- self.responses = responses
- self.server.transport.loseConnection()
- self.client.noop().addCallback(setResponses)
- self.connected.addCallback(strip(noop)).addErrback(self._ebGeneral)
- d = self.loopback()
- return d.addCallback(lambda _: self.assertEqual(self.responses, []))
-
- def testLogin(self):
- def login():
- d = self.client.login('testuser', 'password-test')
- d.addCallback(self._cbStopClient)
- d1 = self.connected.addCallback(strip(login)).addErrback(self._ebGeneral)
- d = defer.gatherResults([d1, self.loopback()])
- return d.addCallback(self._cbTestLogin)
-
- def _cbTestLogin(self, ignored):
- self.assertEqual(self.server.account, SimpleServer.theAccount)
- self.assertEqual(self.server.state, 'auth')
-
- def testFailedLogin(self):
- def login():
- d = self.client.login('testuser', 'wrong-password')
- d.addBoth(self._cbStopClient)
-
- d1 = self.connected.addCallback(strip(login)).addErrback(self._ebGeneral)
- d2 = self.loopback()
- d = defer.gatherResults([d1, d2])
- return d.addCallback(self._cbTestFailedLogin)
-
- def _cbTestFailedLogin(self, ignored):
- self.assertEqual(self.server.account, None)
- self.assertEqual(self.server.state, 'unauth')
-
-
- def testLoginRequiringQuoting(self):
- self.server._username = '{test}user'
- self.server._password = '{test}password'
-
- def login():
- d = self.client.login('{test}user', '{test}password')
- d.addBoth(self._cbStopClient)
-
- d1 = self.connected.addCallback(strip(login)).addErrback(self._ebGeneral)
- d = defer.gatherResults([self.loopback(), d1])
- return d.addCallback(self._cbTestLoginRequiringQuoting)
-
- def _cbTestLoginRequiringQuoting(self, ignored):
- self.assertEqual(self.server.account, SimpleServer.theAccount)
- self.assertEqual(self.server.state, 'auth')
-
-
- def testNamespace(self):
- self.namespaceArgs = None
- def login():
- return self.client.login('testuser', 'password-test')
- def namespace():
- def gotNamespace(args):
- self.namespaceArgs = args
- self._cbStopClient(None)
- return self.client.namespace().addCallback(gotNamespace)
-
- d1 = self.connected.addCallback(strip(login))
- d1.addCallback(strip(namespace))
- d1.addErrback(self._ebGeneral)
- d2 = self.loopback()
- d = defer.gatherResults([d1, d2])
- d.addCallback(lambda _: self.assertEqual(self.namespaceArgs,
- [[['', '/']], [], []]))
- return d
-
- def testSelect(self):
- SimpleServer.theAccount.addMailbox('test-mailbox')
- self.selectedArgs = None
- def login():
- return self.client.login('testuser', 'password-test')
- def select():
- def selected(args):
- self.selectedArgs = args
- self._cbStopClient(None)
- d = self.client.select('test-mailbox')
- d.addCallback(selected)
- return d
-
- d1 = self.connected.addCallback(strip(login))
- d1.addCallback(strip(select))
- d1.addErrback(self._ebGeneral)
- d2 = self.loopback()
- return defer.gatherResults([d1, d2]).addCallback(self._cbTestSelect)
-
- def _cbTestSelect(self, ignored):
- mbox = SimpleServer.theAccount.mailboxes['TEST-MAILBOX']
- self.assertEqual(self.server.mbox, mbox)
- self.assertEqual(self.selectedArgs, {
- 'EXISTS': 9, 'RECENT': 3, 'UIDVALIDITY': 42,
- 'FLAGS': ('\\Flag1', 'Flag2', '\\AnotherSysFlag', 'LastFlag'),
- 'READ-WRITE': 1
- })
-
-
- def test_examine(self):
- """
- L{IMAP4Client.examine} issues an I{EXAMINE} command to the server and
- returns a L{Deferred} which fires with a C{dict} with as many of the
- following keys as the server includes in its response: C{'FLAGS'},
- C{'EXISTS'}, C{'RECENT'}, C{'UNSEEN'}, C{'READ-WRITE'}, C{'READ-ONLY'},
- C{'UIDVALIDITY'}, and C{'PERMANENTFLAGS'}.
-
- Unfortunately the server doesn't generate all of these so it's hard to
- test the client's handling of them here. See
- L{IMAP4ClientExamineTests} below.
-
- See U{RFC 3501<http://www.faqs.org/rfcs/rfc3501.html>}, section 6.3.2,
- for details.
- """
- SimpleServer.theAccount.addMailbox('test-mailbox')
- self.examinedArgs = None
- def login():
- return self.client.login('testuser', 'password-test')
- def examine():
- def examined(args):
- self.examinedArgs = args
- self._cbStopClient(None)
- d = self.client.examine('test-mailbox')
- d.addCallback(examined)
- return d
-
- d1 = self.connected.addCallback(strip(login))
- d1.addCallback(strip(examine))
- d1.addErrback(self._ebGeneral)
- d2 = self.loopback()
- d = defer.gatherResults([d1, d2])
- return d.addCallback(self._cbTestExamine)
-
-
- def _cbTestExamine(self, ignored):
- mbox = SimpleServer.theAccount.mailboxes['TEST-MAILBOX']
- self.assertEqual(self.server.mbox, mbox)
- self.assertEqual(self.examinedArgs, {
- 'EXISTS': 9, 'RECENT': 3, 'UIDVALIDITY': 42,
- 'FLAGS': ('\\Flag1', 'Flag2', '\\AnotherSysFlag', 'LastFlag'),
- 'READ-WRITE': False})
-
-
- def testCreate(self):
- succeed = ('testbox', 'test/box', 'test/', 'test/box/box', 'INBOX')
- fail = ('testbox', 'test/box')
-
- def cb(): self.result.append(1)
- def eb(failure): self.result.append(0)
-
- def login():
- return self.client.login('testuser', 'password-test')
- def create():
- for name in succeed + fail:
- d = self.client.create(name)
- d.addCallback(strip(cb)).addErrback(eb)
- d.addCallbacks(self._cbStopClient, self._ebGeneral)
-
- self.result = []
- d1 = self.connected.addCallback(strip(login)).addCallback(strip(create))
- d2 = self.loopback()
- d = defer.gatherResults([d1, d2])
- return d.addCallback(self._cbTestCreate, succeed, fail)
-
- def _cbTestCreate(self, ignored, succeed, fail):
- self.assertEqual(self.result, [1] * len(succeed) + [0] * len(fail))
- mbox = SimpleServer.theAccount.mailboxes.keys()
- answers = ['inbox', 'testbox', 'test/box', 'test', 'test/box/box']
- mbox.sort()
- answers.sort()
- self.assertEqual(mbox, [a.upper() for a in answers])
-
- def testDelete(self):
- SimpleServer.theAccount.addMailbox('delete/me')
-
- def login():
- return self.client.login('testuser', 'password-test')
- def delete():
- return self.client.delete('delete/me')
- d1 = self.connected.addCallback(strip(login))
- d1.addCallbacks(strip(delete), self._ebGeneral)
- d1.addCallbacks(self._cbStopClient, self._ebGeneral)
- d2 = self.loopback()
- d = defer.gatherResults([d1, d2])
- d.addCallback(lambda _:
- self.assertEqual(SimpleServer.theAccount.mailboxes.keys(), []))
- return d
-
- def testIllegalInboxDelete(self):
- self.stashed = None
- def login():
- return self.client.login('testuser', 'password-test')
- def delete():
- return self.client.delete('inbox')
- def stash(result):
- self.stashed = result
-
- d1 = self.connected.addCallback(strip(login))
- d1.addCallbacks(strip(delete), self._ebGeneral)
- d1.addBoth(stash)
- d1.addCallbacks(self._cbStopClient, self._ebGeneral)
- d2 = self.loopback()
- d = defer.gatherResults([d1, d2])
- d.addCallback(lambda _: self.failUnless(isinstance(self.stashed,
- failure.Failure)))
- return d
-
-
- def testNonExistentDelete(self):
- def login():
- return self.client.login('testuser', 'password-test')
- def delete():
- return self.client.delete('delete/me')
- def deleteFailed(failure):
- self.failure = failure
-
- self.failure = None
- d1 = self.connected.addCallback(strip(login))
- d1.addCallback(strip(delete)).addErrback(deleteFailed)
- d1.addCallbacks(self._cbStopClient, self._ebGeneral)
- d2 = self.loopback()
- d = defer.gatherResults([d1, d2])
- d.addCallback(lambda _: self.assertEqual(str(self.failure.value),
- 'No such mailbox'))
- return d
-
-
- def testIllegalDelete(self):
- m = SimpleMailbox()
- m.flags = (r'\Noselect',)
- SimpleServer.theAccount.addMailbox('delete', m)
- SimpleServer.theAccount.addMailbox('delete/me')
-
- def login():
- return self.client.login('testuser', 'password-test')
- def delete():
- return self.client.delete('delete')
- def deleteFailed(failure):
- self.failure = failure
-
- self.failure = None
- d1 = self.connected.addCallback(strip(login))
- d1.addCallback(strip(delete)).addErrback(deleteFailed)
- d1.addCallbacks(self._cbStopClient, self._ebGeneral)
- d2 = self.loopback()
- d = defer.gatherResults([d1, d2])
- expected = "Hierarchically inferior mailboxes exist and \\Noselect is set"
- d.addCallback(lambda _:
- self.assertEqual(str(self.failure.value), expected))
- return d
-
- def testRename(self):
- SimpleServer.theAccount.addMailbox('oldmbox')
- def login():
- return self.client.login('testuser', 'password-test')
- def rename():
- return self.client.rename('oldmbox', 'newname')
-
- d1 = self.connected.addCallback(strip(login))
- d1.addCallbacks(strip(rename), self._ebGeneral)
- d1.addCallbacks(self._cbStopClient, self._ebGeneral)
- d2 = self.loopback()
- d = defer.gatherResults([d1, d2])
- d.addCallback(lambda _:
- self.assertEqual(SimpleServer.theAccount.mailboxes.keys(),
- ['NEWNAME']))
- return d
-
- def testIllegalInboxRename(self):
- self.stashed = None
- def login():
- return self.client.login('testuser', 'password-test')
- def rename():
- return self.client.rename('inbox', 'frotz')
- def stash(stuff):
- self.stashed = stuff
-
- d1 = self.connected.addCallback(strip(login))
- d1.addCallbacks(strip(rename), self._ebGeneral)
- d1.addBoth(stash)
- d1.addCallbacks(self._cbStopClient, self._ebGeneral)
- d2 = self.loopback()
- d = defer.gatherResults([d1, d2])
- d.addCallback(lambda _:
- self.failUnless(isinstance(self.stashed, failure.Failure)))
- return d
-
- def testHierarchicalRename(self):
- SimpleServer.theAccount.create('oldmbox/m1')
- SimpleServer.theAccount.create('oldmbox/m2')
- def login():
- return self.client.login('testuser', 'password-test')
- def rename():
- return self.client.rename('oldmbox', 'newname')
-
- d1 = self.connected.addCallback(strip(login))
- d1.addCallbacks(strip(rename), self._ebGeneral)
- d1.addCallbacks(self._cbStopClient, self._ebGeneral)
- d2 = self.loopback()
- d = defer.gatherResults([d1, d2])
- return d.addCallback(self._cbTestHierarchicalRename)
-
- def _cbTestHierarchicalRename(self, ignored):
- mboxes = SimpleServer.theAccount.mailboxes.keys()
- expected = ['newname', 'newname/m1', 'newname/m2']
- mboxes.sort()
- self.assertEqual(mboxes, [s.upper() for s in expected])
-
- def testSubscribe(self):
- def login():
- return self.client.login('testuser', 'password-test')
- def subscribe():
- return self.client.subscribe('this/mbox')
-
- d1 = self.connected.addCallback(strip(login))
- d1.addCallbacks(strip(subscribe), self._ebGeneral)
- d1.addCallbacks(self._cbStopClient, self._ebGeneral)
- d2 = self.loopback()
- d = defer.gatherResults([d1, d2])
- d.addCallback(lambda _:
- self.assertEqual(SimpleServer.theAccount.subscriptions,
- ['THIS/MBOX']))
- return d
-
- def testUnsubscribe(self):
- SimpleServer.theAccount.subscriptions = ['THIS/MBOX', 'THAT/MBOX']
- def login():
- return self.client.login('testuser', 'password-test')
- def unsubscribe():
- return self.client.unsubscribe('this/mbox')
-
- d1 = self.connected.addCallback(strip(login))
- d1.addCallbacks(strip(unsubscribe), self._ebGeneral)
- d1.addCallbacks(self._cbStopClient, self._ebGeneral)
- d2 = self.loopback()
- d = defer.gatherResults([d1, d2])
- d.addCallback(lambda _:
- self.assertEqual(SimpleServer.theAccount.subscriptions,
- ['THAT/MBOX']))
- return d
-
- def _listSetup(self, f):
- SimpleServer.theAccount.addMailbox('root/subthing')
- SimpleServer.theAccount.addMailbox('root/another-thing')
- SimpleServer.theAccount.addMailbox('non-root/subthing')
-
- def login():
- return self.client.login('testuser', 'password-test')
- def listed(answers):
- self.listed = answers
-
- self.listed = None
- d1 = self.connected.addCallback(strip(login))
- d1.addCallbacks(strip(f), self._ebGeneral)
- d1.addCallbacks(listed, self._ebGeneral)
- d1.addCallbacks(self._cbStopClient, self._ebGeneral)
- d2 = self.loopback()
- return defer.gatherResults([d1, d2]).addCallback(lambda _: self.listed)
-
- def testList(self):
- def list():
- return self.client.list('root', '%')
- d = self._listSetup(list)
- d.addCallback(lambda listed: self.assertEqual(
- sortNest(listed),
- sortNest([
- (SimpleMailbox.flags, "/", "ROOT/SUBTHING"),
- (SimpleMailbox.flags, "/", "ROOT/ANOTHER-THING")
- ])
- ))
- return d
-
- def testLSub(self):
- SimpleServer.theAccount.subscribe('ROOT/SUBTHING')
- def lsub():
- return self.client.lsub('root', '%')
- d = self._listSetup(lsub)
- d.addCallback(self.assertEqual,
- [(SimpleMailbox.flags, "/", "ROOT/SUBTHING")])
- return d
-
- def testStatus(self):
- SimpleServer.theAccount.addMailbox('root/subthing')
- def login():
- return self.client.login('testuser', 'password-test')
- def status():
- return self.client.status('root/subthing', 'MESSAGES', 'UIDNEXT', 'UNSEEN')
- def statused(result):
- self.statused = result
-
- self.statused = None
- d1 = self.connected.addCallback(strip(login))
- d1.addCallbacks(strip(status), self._ebGeneral)
- d1.addCallbacks(statused, self._ebGeneral)
- d1.addCallbacks(self._cbStopClient, self._ebGeneral)
- d2 = self.loopback()
- d = defer.gatherResults([d1, d2])
- d.addCallback(lambda _: self.assertEqual(
- self.statused,
- {'MESSAGES': 9, 'UIDNEXT': '10', 'UNSEEN': 4}
- ))
- return d
-
- def testFailedStatus(self):
- def login():
- return self.client.login('testuser', 'password-test')
- def status():
- return self.client.status('root/nonexistent', 'MESSAGES', 'UIDNEXT', 'UNSEEN')
- def statused(result):
- self.statused = result
- def failed(failure):
- self.failure = failure
-
- self.statused = self.failure = None
- d1 = self.connected.addCallback(strip(login))
- d1.addCallbacks(strip(status), self._ebGeneral)
- d1.addCallbacks(statused, failed)
- d1.addCallbacks(self._cbStopClient, self._ebGeneral)
- d2 = self.loopback()
- return defer.gatherResults([d1, d2]).addCallback(self._cbTestFailedStatus)
-
- def _cbTestFailedStatus(self, ignored):
- self.assertEqual(
- self.statused, None
- )
- self.assertEqual(
- self.failure.value.args,
- ('Could not open mailbox',)
- )
-
- def testFullAppend(self):
- infile = util.sibpath(__file__, 'rfc822.message')
- message = open(infile)
- SimpleServer.theAccount.addMailbox('root/subthing')
- def login():
- return self.client.login('testuser', 'password-test')
- def append():
- return self.client.append(
- 'root/subthing',
- message,
- ('\\SEEN', '\\DELETED'),
- 'Tue, 17 Jun 2003 11:22:16 -0600 (MDT)',
- )
-
- d1 = self.connected.addCallback(strip(login))
- d1.addCallbacks(strip(append), self._ebGeneral)
- d1.addCallbacks(self._cbStopClient, self._ebGeneral)
- d2 = self.loopback()
- d = defer.gatherResults([d1, d2])
- return d.addCallback(self._cbTestFullAppend, infile)
-
- def _cbTestFullAppend(self, ignored, infile):
- mb = SimpleServer.theAccount.mailboxes['ROOT/SUBTHING']
- self.assertEqual(1, len(mb.messages))
- self.assertEqual(
- (['\\SEEN', '\\DELETED'], 'Tue, 17 Jun 2003 11:22:16 -0600 (MDT)', 0),
- mb.messages[0][1:]
- )
- self.assertEqual(open(infile).read(), mb.messages[0][0].getvalue())
-
- def testPartialAppend(self):
- infile = util.sibpath(__file__, 'rfc822.message')
- message = open(infile)
- SimpleServer.theAccount.addMailbox('PARTIAL/SUBTHING')
- def login():
- return self.client.login('testuser', 'password-test')
- def append():
- message = file(infile)
- return self.client.sendCommand(
- imap4.Command(
- 'APPEND',
- 'PARTIAL/SUBTHING (\\SEEN) "Right now" {%d}' % os.path.getsize(infile),
- (), self.client._IMAP4Client__cbContinueAppend, message
- )
- )
- d1 = self.connected.addCallback(strip(login))
- d1.addCallbacks(strip(append), self._ebGeneral)
- d1.addCallbacks(self._cbStopClient, self._ebGeneral)
- d2 = self.loopback()
- d = defer.gatherResults([d1, d2])
- return d.addCallback(self._cbTestPartialAppend, infile)
-
- def _cbTestPartialAppend(self, ignored, infile):
- mb = SimpleServer.theAccount.mailboxes['PARTIAL/SUBTHING']
- self.assertEqual(1, len(mb.messages))
- self.assertEqual(
- (['\\SEEN'], 'Right now', 0),
- mb.messages[0][1:]
- )
- self.assertEqual(open(infile).read(), mb.messages[0][0].getvalue())
-
- def testCheck(self):
- SimpleServer.theAccount.addMailbox('root/subthing')
- def login():
- return self.client.login('testuser', 'password-test')
- def select():
- return self.client.select('root/subthing')
- def check():
- return self.client.check()
-
- d = self.connected.addCallback(strip(login))
- d.addCallbacks(strip(select), self._ebGeneral)
- d.addCallbacks(strip(check), self._ebGeneral)
- d.addCallbacks(self._cbStopClient, self._ebGeneral)
- return self.loopback()
-
- # Okay, that was fun
-
- def testClose(self):
- m = SimpleMailbox()
- m.messages = [
- ('Message 1', ('\\Deleted', 'AnotherFlag'), None, 0),
- ('Message 2', ('AnotherFlag',), None, 1),
- ('Message 3', ('\\Deleted',), None, 2),
- ]
- SimpleServer.theAccount.addMailbox('mailbox', m)
- def login():
- return self.client.login('testuser', 'password-test')
- def select():
- return self.client.select('mailbox')
- def close():
- return self.client.close()
-
- d = self.connected.addCallback(strip(login))
- d.addCallbacks(strip(select), self._ebGeneral)
- d.addCallbacks(strip(close), self._ebGeneral)
- d.addCallbacks(self._cbStopClient, self._ebGeneral)
- d2 = self.loopback()
- return defer.gatherResults([d, d2]).addCallback(self._cbTestClose, m)
-
- def _cbTestClose(self, ignored, m):
- self.assertEqual(len(m.messages), 1)
- self.assertEqual(m.messages[0], ('Message 2', ('AnotherFlag',), None, 1))
- self.failUnless(m.closed)
-
- def testExpunge(self):
- m = SimpleMailbox()
- m.messages = [
- ('Message 1', ('\\Deleted', 'AnotherFlag'), None, 0),
- ('Message 2', ('AnotherFlag',), None, 1),
- ('Message 3', ('\\Deleted',), None, 2),
- ]
- SimpleServer.theAccount.addMailbox('mailbox', m)
- def login():
- return self.client.login('testuser', 'password-test')
- def select():
- return self.client.select('mailbox')
- def expunge():
- return self.client.expunge()
- def expunged(results):
- self.failIf(self.server.mbox is None)
- self.results = results
-
- self.results = None
- d1 = self.connected.addCallback(strip(login))
- d1.addCallbacks(strip(select), self._ebGeneral)
- d1.addCallbacks(strip(expunge), self._ebGeneral)
- d1.addCallbacks(expunged, self._ebGeneral)
- d1.addCallbacks(self._cbStopClient, self._ebGeneral)
- d2 = self.loopback()
- d = defer.gatherResults([d1, d2])
- return d.addCallback(self._cbTestExpunge, m)
-
- def _cbTestExpunge(self, ignored, m):
- self.assertEqual(len(m.messages), 1)
- self.assertEqual(m.messages[0], ('Message 2', ('AnotherFlag',), None, 1))
-
- self.assertEqual(self.results, [0, 2])
-
-
-
-class IMAP4ServerSearchTestCase(IMAP4HelperMixin, unittest.TestCase):
- """
- Tests for the behavior of the search_* functions in L{imap4.IMAP4Server}.
- """
- def setUp(self):
- IMAP4HelperMixin.setUp(self)
- self.earlierQuery = ["10-Dec-2009"]
- self.sameDateQuery = ["13-Dec-2009"]
- self.laterQuery = ["16-Dec-2009"]
- self.seq = 0
- self.msg = FakeyMessage({"date" : "Mon, 13 Dec 2009 21:25:10 GMT"}, [],
- '', '', 1234, None)
-
-
- def test_searchSentBefore(self):
- """
- L{imap4.IMAP4Server.search_SENTBEFORE} returns True if the message date
- is earlier than the query date.
- """
- self.assertFalse(
- self.server.search_SENTBEFORE(self.earlierQuery, self.seq, self.msg))
- self.assertTrue(
- self.server.search_SENTBEFORE(self.laterQuery, self.seq, self.msg))
-
- def test_searchWildcard(self):
- """
- L{imap4.IMAP4Server.search_UID} returns True if the message UID is in
- the search range.
- """
- self.assertFalse(
- self.server.search_UID(['2:3'], self.seq, self.msg, (1, 1234)))
- # 2:* should get translated to 2:<max UID> and then to 1:2
- self.assertTrue(
- self.server.search_UID(['2:*'], self.seq, self.msg, (1, 1234)))
- self.assertTrue(
- self.server.search_UID(['*'], self.seq, self.msg, (1, 1234)))
-
- def test_searchWildcardHigh(self):
- """
- L{imap4.IMAP4Server.search_UID} should return True if there is a
- wildcard, because a wildcard means "highest UID in the mailbox".
- """
- self.assertTrue(
- self.server.search_UID(['1235:*'], self.seq, self.msg, (1234, 1)))
-
- def test_reversedSearchTerms(self):
- """
- L{imap4.IMAP4Server.search_SENTON} returns True if the message date is
- the same as the query date.
- """
- msgset = imap4.parseIdList('4:2')
- self.assertEqual(list(msgset), [2, 3, 4])
-
- def test_searchSentOn(self):
- """
- L{imap4.IMAP4Server.search_SENTON} returns True if the message date is
- the same as the query date.
- """
- self.assertFalse(
- self.server.search_SENTON(self.earlierQuery, self.seq, self.msg))
- self.assertTrue(
- self.server.search_SENTON(self.sameDateQuery, self.seq, self.msg))
- self.assertFalse(
- self.server.search_SENTON(self.laterQuery, self.seq, self.msg))
-
-
- def test_searchSentSince(self):
- """
- L{imap4.IMAP4Server.search_SENTSINCE} returns True if the message date
- is later than the query date.
- """
- self.assertTrue(
- self.server.search_SENTSINCE(self.earlierQuery, self.seq, self.msg))
- self.assertFalse(
- self.server.search_SENTSINCE(self.laterQuery, self.seq, self.msg))
-
-
- def test_searchOr(self):
- """
- L{imap4.IMAP4Server.search_OR} returns true if either of the two
- expressions supplied to it returns true and returns false if neither
- does.
- """
- self.assertTrue(
- self.server.search_OR(
- ["SENTSINCE"] + self.earlierQuery +
- ["SENTSINCE"] + self.laterQuery,
- self.seq, self.msg, (None, None)))
- self.assertTrue(
- self.server.search_OR(
- ["SENTSINCE"] + self.laterQuery +
- ["SENTSINCE"] + self.earlierQuery,
- self.seq, self.msg, (None, None)))
- self.assertFalse(
- self.server.search_OR(
- ["SENTON"] + self.laterQuery +
- ["SENTSINCE"] + self.laterQuery,
- self.seq, self.msg, (None, None)))
-
-
- def test_searchNot(self):
- """
- L{imap4.IMAP4Server.search_NOT} returns the negation of the result
- of the expression supplied to it.
- """
- self.assertFalse(self.server.search_NOT(
- ["SENTSINCE"] + self.earlierQuery, self.seq, self.msg,
- (None, None)))
- self.assertTrue(self.server.search_NOT(
- ["SENTON"] + self.laterQuery, self.seq, self.msg,
- (None, None)))
-
-
-
-class TestRealm:
- theAccount = None
-
- def requestAvatar(self, avatarId, mind, *interfaces):
- return imap4.IAccount, self.theAccount, lambda: None
-
-class TestChecker:
- credentialInterfaces = (cred.credentials.IUsernameHashedPassword, cred.credentials.IUsernamePassword)
-
- users = {
- 'testuser': 'secret'
- }
-
- def requestAvatarId(self, credentials):
- if credentials.username in self.users:
- return defer.maybeDeferred(
- credentials.checkPassword, self.users[credentials.username]
- ).addCallback(self._cbCheck, credentials.username)
-
- def _cbCheck(self, result, username):
- if result:
- return username
- raise cred.error.UnauthorizedLogin()
-
-class AuthenticatorTestCase(IMAP4HelperMixin, unittest.TestCase):
- def setUp(self):
- IMAP4HelperMixin.setUp(self)
-
- realm = TestRealm()
- realm.theAccount = Account('testuser')
- portal = cred.portal.Portal(realm)
- portal.registerChecker(TestChecker())
- self.server.portal = portal
-
- self.authenticated = 0
- self.account = realm.theAccount
-
- def testCramMD5(self):
- self.server.challengers['CRAM-MD5'] = cred.credentials.CramMD5Credentials
- cAuth = imap4.CramMD5ClientAuthenticator('testuser')
- self.client.registerAuthenticator(cAuth)
-
- def auth():
- return self.client.authenticate('secret')
- def authed():
- self.authenticated = 1
-
- d1 = self.connected.addCallback(strip(auth))
- d1.addCallbacks(strip(authed), self._ebGeneral)
- d1.addCallbacks(self._cbStopClient, self._ebGeneral)
- d2 = self.loopback()
- d = defer.gatherResults([d1, d2])
- return d.addCallback(self._cbTestCramMD5)
-
- def _cbTestCramMD5(self, ignored):
- self.assertEqual(self.authenticated, 1)
- self.assertEqual(self.server.account, self.account)
-
- def testFailedCramMD5(self):
- self.server.challengers['CRAM-MD5'] = cred.credentials.CramMD5Credentials
- cAuth = imap4.CramMD5ClientAuthenticator('testuser')
- self.client.registerAuthenticator(cAuth)
-
- def misauth():
- return self.client.authenticate('not the secret')
- def authed():
- self.authenticated = 1
- def misauthed():
- self.authenticated = -1
-
- d1 = self.connected.addCallback(strip(misauth))
- d1.addCallbacks(strip(authed), strip(misauthed))
- d1.addCallbacks(self._cbStopClient, self._ebGeneral)
- d = defer.gatherResults([self.loopback(), d1])
- return d.addCallback(self._cbTestFailedCramMD5)
-
- def _cbTestFailedCramMD5(self, ignored):
- self.assertEqual(self.authenticated, -1)
- self.assertEqual(self.server.account, None)
-
- def testLOGIN(self):
- self.server.challengers['LOGIN'] = imap4.LOGINCredentials
- cAuth = imap4.LOGINAuthenticator('testuser')
- self.client.registerAuthenticator(cAuth)
-
- def auth():
- return self.client.authenticate('secret')
- def authed():
- self.authenticated = 1
-
- d1 = self.connected.addCallback(strip(auth))
- d1.addCallbacks(strip(authed), self._ebGeneral)
- d1.addCallbacks(self._cbStopClient, self._ebGeneral)
- d = defer.gatherResults([self.loopback(), d1])
- return d.addCallback(self._cbTestLOGIN)
-
- def _cbTestLOGIN(self, ignored):
- self.assertEqual(self.authenticated, 1)
- self.assertEqual(self.server.account, self.account)
-
- def testFailedLOGIN(self):
- self.server.challengers['LOGIN'] = imap4.LOGINCredentials
- cAuth = imap4.LOGINAuthenticator('testuser')
- self.client.registerAuthenticator(cAuth)
-
- def misauth():
- return self.client.authenticate('not the secret')
- def authed():
- self.authenticated = 1
- def misauthed():
- self.authenticated = -1
-
- d1 = self.connected.addCallback(strip(misauth))
- d1.addCallbacks(strip(authed), strip(misauthed))
- d1.addCallbacks(self._cbStopClient, self._ebGeneral)
- d = defer.gatherResults([self.loopback(), d1])
- return d.addCallback(self._cbTestFailedLOGIN)
-
- def _cbTestFailedLOGIN(self, ignored):
- self.assertEqual(self.authenticated, -1)
- self.assertEqual(self.server.account, None)
-
- def testPLAIN(self):
- self.server.challengers['PLAIN'] = imap4.PLAINCredentials
- cAuth = imap4.PLAINAuthenticator('testuser')
- self.client.registerAuthenticator(cAuth)
-
- def auth():
- return self.client.authenticate('secret')
- def authed():
- self.authenticated = 1
-
- d1 = self.connected.addCallback(strip(auth))
- d1.addCallbacks(strip(authed), self._ebGeneral)
- d1.addCallbacks(self._cbStopClient, self._ebGeneral)
- d = defer.gatherResults([self.loopback(), d1])
- return d.addCallback(self._cbTestPLAIN)
-
- def _cbTestPLAIN(self, ignored):
- self.assertEqual(self.authenticated, 1)
- self.assertEqual(self.server.account, self.account)
-
- def testFailedPLAIN(self):
- self.server.challengers['PLAIN'] = imap4.PLAINCredentials
- cAuth = imap4.PLAINAuthenticator('testuser')
- self.client.registerAuthenticator(cAuth)
-
- def misauth():
- return self.client.authenticate('not the secret')
- def authed():
- self.authenticated = 1
- def misauthed():
- self.authenticated = -1
-
- d1 = self.connected.addCallback(strip(misauth))
- d1.addCallbacks(strip(authed), strip(misauthed))
- d1.addCallbacks(self._cbStopClient, self._ebGeneral)
- d = defer.gatherResults([self.loopback(), d1])
- return d.addCallback(self._cbTestFailedPLAIN)
-
- def _cbTestFailedPLAIN(self, ignored):
- self.assertEqual(self.authenticated, -1)
- self.assertEqual(self.server.account, None)
-
-
-
-class SASLPLAINTestCase(unittest.TestCase):
- """
- Tests for I{SASL PLAIN} authentication, as implemented by
- L{imap4.PLAINAuthenticator} and L{imap4.PLAINCredentials}.
-
- @see: U{http://www.faqs.org/rfcs/rfc2595.html}
- @see: U{http://www.faqs.org/rfcs/rfc4616.html}
- """
- def test_authenticatorChallengeResponse(self):
- """
- L{PLAINAuthenticator.challengeResponse} returns challenge strings of
- the form::
-
- NUL<authn-id>NUL<secret>
- """
- username = 'testuser'
- secret = 'secret'
- chal = 'challenge'
- cAuth = imap4.PLAINAuthenticator(username)
- response = cAuth.challengeResponse(secret, chal)
- self.assertEqual(response, '\0%s\0%s' % (username, secret))
-
-
- def test_credentialsSetResponse(self):
- """
- L{PLAINCredentials.setResponse} parses challenge strings of the
- form::
-
- NUL<authn-id>NUL<secret>
- """
- cred = imap4.PLAINCredentials()
- cred.setResponse('\0testuser\0secret')
- self.assertEqual(cred.username, 'testuser')
- self.assertEqual(cred.password, 'secret')
-
-
- def test_credentialsInvalidResponse(self):
- """
- L{PLAINCredentials.setResponse} raises L{imap4.IllegalClientResponse}
- when passed a string not of the expected form.
- """
- cred = imap4.PLAINCredentials()
- self.assertRaises(
- imap4.IllegalClientResponse, cred.setResponse, 'hello')
- self.assertRaises(
- imap4.IllegalClientResponse, cred.setResponse, 'hello\0world')
- self.assertRaises(
- imap4.IllegalClientResponse, cred.setResponse,
- 'hello\0world\0Zoom!\0')
-
-
-
-class UnsolicitedResponseTestCase(IMAP4HelperMixin, unittest.TestCase):
- def testReadWrite(self):
- def login():
- return self.client.login('testuser', 'password-test')
- def loggedIn():
- self.server.modeChanged(1)
-
- d1 = self.connected.addCallback(strip(login))
- d1.addCallback(strip(loggedIn)).addErrback(self._ebGeneral)
- d = defer.gatherResults([self.loopback(), d1])
- return d.addCallback(self._cbTestReadWrite)
-
- def _cbTestReadWrite(self, ignored):
- E = self.client.events
- self.assertEqual(E, [['modeChanged', 1]])
-
- def testReadOnly(self):
- def login():
- return self.client.login('testuser', 'password-test')
- def loggedIn():
- self.server.modeChanged(0)
-
- d1 = self.connected.addCallback(strip(login))
- d1.addCallback(strip(loggedIn)).addErrback(self._ebGeneral)
- d = defer.gatherResults([self.loopback(), d1])
- return d.addCallback(self._cbTestReadOnly)
-
- def _cbTestReadOnly(self, ignored):
- E = self.client.events
- self.assertEqual(E, [['modeChanged', 0]])
-
- def testFlagChange(self):
- flags = {
- 1: ['\\Answered', '\\Deleted'],
- 5: [],
- 10: ['\\Recent']
- }
- def login():
- return self.client.login('testuser', 'password-test')
- def loggedIn():
- self.server.flagsChanged(flags)
-
- d1 = self.connected.addCallback(strip(login))
- d1.addCallback(strip(loggedIn)).addErrback(self._ebGeneral)
- d = defer.gatherResults([self.loopback(), d1])
- return d.addCallback(self._cbTestFlagChange, flags)
-
- def _cbTestFlagChange(self, ignored, flags):
- E = self.client.events
- expect = [['flagsChanged', {x[0]: x[1]}] for x in flags.items()]
- E.sort()
- expect.sort()
- self.assertEqual(E, expect)
-
- def testNewMessages(self):
- def login():
- return self.client.login('testuser', 'password-test')
- def loggedIn():
- self.server.newMessages(10, None)
-
- d1 = self.connected.addCallback(strip(login))
- d1.addCallback(strip(loggedIn)).addErrback(self._ebGeneral)
- d = defer.gatherResults([self.loopback(), d1])
- return d.addCallback(self._cbTestNewMessages)
-
- def _cbTestNewMessages(self, ignored):
- E = self.client.events
- self.assertEqual(E, [['newMessages', 10, None]])
-
- def testNewRecentMessages(self):
- def login():
- return self.client.login('testuser', 'password-test')
- def loggedIn():
- self.server.newMessages(None, 10)
-
- d1 = self.connected.addCallback(strip(login))
- d1.addCallback(strip(loggedIn)).addErrback(self._ebGeneral)
- d = defer.gatherResults([self.loopback(), d1])
- return d.addCallback(self._cbTestNewRecentMessages)
-
- def _cbTestNewRecentMessages(self, ignored):
- E = self.client.events
- self.assertEqual(E, [['newMessages', None, 10]])
-
- def testNewMessagesAndRecent(self):
- def login():
- return self.client.login('testuser', 'password-test')
- def loggedIn():
- self.server.newMessages(20, 10)
-
- d1 = self.connected.addCallback(strip(login))
- d1.addCallback(strip(loggedIn)).addErrback(self._ebGeneral)
- d = defer.gatherResults([self.loopback(), d1])
- return d.addCallback(self._cbTestNewMessagesAndRecent)
-
- def _cbTestNewMessagesAndRecent(self, ignored):
- E = self.client.events
- self.assertEqual(E, [['newMessages', 20, None], ['newMessages', None, 10]])
-
-
-class ClientCapabilityTests(unittest.TestCase):
- """
- Tests for issuance of the CAPABILITY command and handling of its response.
- """
- def setUp(self):
- """
- Create an L{imap4.IMAP4Client} connected to a L{StringTransport}.
- """
- self.transport = StringTransport()
- self.protocol = imap4.IMAP4Client()
- self.protocol.makeConnection(self.transport)
- self.protocol.dataReceived('* OK [IMAP4rev1]\r\n')
-
-
- def test_simpleAtoms(self):
- """
- A capability response consisting only of atoms without C{'='} in them
- should result in a dict mapping those atoms to C{None}.
- """
- capabilitiesResult = self.protocol.getCapabilities(useCache=False)
- self.protocol.dataReceived('* CAPABILITY IMAP4rev1 LOGINDISABLED\r\n')
- self.protocol.dataReceived('0001 OK Capability completed.\r\n')
- def gotCapabilities(capabilities):
- self.assertEqual(
- capabilities, {'IMAP4rev1': None, 'LOGINDISABLED': None})
- capabilitiesResult.addCallback(gotCapabilities)
- return capabilitiesResult
-
-
- def test_categoryAtoms(self):
- """
- A capability response consisting of atoms including C{'='} should have
- those atoms split on that byte and have capabilities in the same
- category aggregated into lists in the resulting dictionary.
-
- (n.b. - I made up the word "category atom"; the protocol has no notion
- of structure here, but rather allows each capability to define the
- semantics of its entry in the capability response in a freeform manner.
- If I had realized this earlier, the API for capabilities would look
- different. As it is, we can hope that no one defines any crazy
- semantics which are incompatible with this API, or try to figure out a
- better API when someone does. -exarkun)
- """
- capabilitiesResult = self.protocol.getCapabilities(useCache=False)
- self.protocol.dataReceived('* CAPABILITY IMAP4rev1 AUTH=LOGIN AUTH=PLAIN\r\n')
- self.protocol.dataReceived('0001 OK Capability completed.\r\n')
- def gotCapabilities(capabilities):
- self.assertEqual(
- capabilities, {'IMAP4rev1': None, 'AUTH': ['LOGIN', 'PLAIN']})
- capabilitiesResult.addCallback(gotCapabilities)
- return capabilitiesResult
-
-
- def test_mixedAtoms(self):
- """
- A capability response consisting of both simple and category atoms of
- the same type should result in a list containing C{None} as well as the
- values for the category.
- """
- capabilitiesResult = self.protocol.getCapabilities(useCache=False)
- # Exercise codepath for both orderings of =-having and =-missing
- # capabilities.
- self.protocol.dataReceived(
- '* CAPABILITY IMAP4rev1 FOO FOO=BAR BAR=FOO BAR\r\n')
- self.protocol.dataReceived('0001 OK Capability completed.\r\n')
- def gotCapabilities(capabilities):
- self.assertEqual(capabilities, {'IMAP4rev1': None,
- 'FOO': [None, 'BAR'],
- 'BAR': ['FOO', None]})
- capabilitiesResult.addCallback(gotCapabilities)
- return capabilitiesResult
-
-
-
-class StillSimplerClient(imap4.IMAP4Client):
- """
- An IMAP4 client which keeps track of unsolicited flag changes.
- """
- def __init__(self):
- imap4.IMAP4Client.__init__(self)
- self.flags = {}
-
-
- def flagsChanged(self, newFlags):
- self.flags.update(newFlags)
-
-
-
-class HandCraftedTestCase(IMAP4HelperMixin, unittest.TestCase):
- def testTrailingLiteral(self):
- transport = StringTransport()
- c = imap4.IMAP4Client()
- c.makeConnection(transport)
- c.lineReceived('* OK [IMAP4rev1]')
-
- def cbSelect(ignored):
- d = c.fetchMessage('1')
- c.dataReceived('* 1 FETCH (RFC822 {10}\r\n0123456789\r\n RFC822.SIZE 10)\r\n')
- c.dataReceived('0003 OK FETCH\r\n')
- return d
-
- def cbLogin(ignored):
- d = c.select('inbox')
- c.lineReceived('0002 OK SELECT')
- d.addCallback(cbSelect)
- return d
-
- d = c.login('blah', 'blah')
- c.dataReceived('0001 OK LOGIN\r\n')
- d.addCallback(cbLogin)
- return d
-
-
- def testPathelogicalScatteringOfLiterals(self):
- self.server.checker.addUser('testuser', 'password-test')
- transport = StringTransport()
- self.server.makeConnection(transport)
-
- transport.clear()
- self.server.dataReceived("01 LOGIN {8}\r\n")
- self.assertEqual(transport.value(), "+ Ready for 8 octets of text\r\n")
-
- transport.clear()
- self.server.dataReceived("testuser {13}\r\n")
- self.assertEqual(transport.value(), "+ Ready for 13 octets of text\r\n")
-
- transport.clear()
- self.server.dataReceived("password-test\r\n")
- self.assertEqual(transport.value(), "01 OK LOGIN succeeded\r\n")
- self.assertEqual(self.server.state, 'auth')
-
- self.server.connectionLost(error.ConnectionDone("Connection done."))
-
-
- def test_unsolicitedResponseMixedWithSolicitedResponse(self):
- """
- If unsolicited data is received along with solicited data in the
- response to a I{FETCH} command issued by L{IMAP4Client.fetchSpecific},
- the unsolicited data is passed to the appropriate callback and not
- included in the result with wihch the L{Deferred} returned by
- L{IMAP4Client.fetchSpecific} fires.
- """
- transport = StringTransport()
- c = StillSimplerClient()
- c.makeConnection(transport)
- c.lineReceived('* OK [IMAP4rev1]')
-
- def login():
- d = c.login('blah', 'blah')
- c.dataReceived('0001 OK LOGIN\r\n')
- return d
- def select():
- d = c.select('inbox')
- c.lineReceived('0002 OK SELECT')
- return d
- def fetch():
- d = c.fetchSpecific('1:*',
- headerType='HEADER.FIELDS',
- headerArgs=['SUBJECT'])
- c.dataReceived('* 1 FETCH (BODY[HEADER.FIELDS ("SUBJECT")] {38}\r\n')
- c.dataReceived('Subject: Suprise for your woman...\r\n')
- c.dataReceived('\r\n')
- c.dataReceived(')\r\n')
- c.dataReceived('* 1 FETCH (FLAGS (\Seen))\r\n')
- c.dataReceived('* 2 FETCH (BODY[HEADER.FIELDS ("SUBJECT")] {75}\r\n')
- c.dataReceived('Subject: What you been doing. Order your meds here . ,. handcuff madsen\r\n')
- c.dataReceived('\r\n')
- c.dataReceived(')\r\n')
- c.dataReceived('0003 OK FETCH completed\r\n')
- return d
- def test(res):
- self.assertEqual(res, {
- 1: [['BODY', ['HEADER.FIELDS', ['SUBJECT']],
- 'Subject: Suprise for your woman...\r\n\r\n']],
- 2: [['BODY', ['HEADER.FIELDS', ['SUBJECT']],
- 'Subject: What you been doing. Order your meds here . ,. handcuff madsen\r\n\r\n']]
- })
-
- self.assertEqual(c.flags, {1: ['\\Seen']})
-
- return login(
- ).addCallback(strip(select)
- ).addCallback(strip(fetch)
- ).addCallback(test)
-
-
- def test_literalWithoutPrecedingWhitespace(self):
- """
- Literals should be recognized even when they are not preceded by
- whitespace.
- """
- transport = StringTransport()
- protocol = imap4.IMAP4Client()
-
- protocol.makeConnection(transport)
- protocol.lineReceived('* OK [IMAP4rev1]')
-
- def login():
- d = protocol.login('blah', 'blah')
- protocol.dataReceived('0001 OK LOGIN\r\n')
- return d
- def select():
- d = protocol.select('inbox')
- protocol.lineReceived('0002 OK SELECT')
- return d
- def fetch():
- d = protocol.fetchSpecific('1:*',
- headerType='HEADER.FIELDS',
- headerArgs=['SUBJECT'])
- protocol.dataReceived(
- '* 1 FETCH (BODY[HEADER.FIELDS ({7}\r\nSUBJECT)] "Hello")\r\n')
- protocol.dataReceived('0003 OK FETCH completed\r\n')
- return d
- def test(result):
- self.assertEqual(
- result, {1: [['BODY', ['HEADER.FIELDS', ['SUBJECT']], 'Hello']]})
-
- d = login()
- d.addCallback(strip(select))
- d.addCallback(strip(fetch))
- d.addCallback(test)
- return d
-
-
- def test_nonIntegerLiteralLength(self):
- """
- If the server sends a literal length which cannot be parsed as an
- integer, L{IMAP4Client.lineReceived} should cause the protocol to be
- disconnected by raising L{imap4.IllegalServerResponse}.
- """
- transport = StringTransport()
- protocol = imap4.IMAP4Client()
-
- protocol.makeConnection(transport)
- protocol.lineReceived('* OK [IMAP4rev1]')
-
- def login():
- d = protocol.login('blah', 'blah')
- protocol.dataReceived('0001 OK LOGIN\r\n')
- return d
- def select():
- d = protocol.select('inbox')
- protocol.lineReceived('0002 OK SELECT')
- return d
- def fetch():
- d = protocol.fetchSpecific('1:*',
- headerType='HEADER.FIELDS',
- headerArgs=['SUBJECT'])
- self.assertRaises(
- imap4.IllegalServerResponse,
- protocol.dataReceived,
- '* 1 FETCH {xyz}\r\n...')
- d = login()
- d.addCallback(strip(select))
- d.addCallback(strip(fetch))
- return d
-
-
- def test_flagsChangedInsideFetchSpecificResponse(self):
- """
- Any unrequested flag information received along with other requested
- information in an untagged I{FETCH} received in response to a request
- issued with L{IMAP4Client.fetchSpecific} is passed to the
- C{flagsChanged} callback.
- """
- transport = StringTransport()
- c = StillSimplerClient()
- c.makeConnection(transport)
- c.lineReceived('* OK [IMAP4rev1]')
-
- def login():
- d = c.login('blah', 'blah')
- c.dataReceived('0001 OK LOGIN\r\n')
- return d
- def select():
- d = c.select('inbox')
- c.lineReceived('0002 OK SELECT')
- return d
- def fetch():
- d = c.fetchSpecific('1:*',
- headerType='HEADER.FIELDS',
- headerArgs=['SUBJECT'])
- # This response includes FLAGS after the requested data.
- c.dataReceived('* 1 FETCH (BODY[HEADER.FIELDS ("SUBJECT")] {22}\r\n')
- c.dataReceived('Subject: subject one\r\n')
- c.dataReceived(' FLAGS (\\Recent))\r\n')
- # And this one includes it before! Either is possible.
- c.dataReceived('* 2 FETCH (FLAGS (\\Seen) BODY[HEADER.FIELDS ("SUBJECT")] {22}\r\n')
- c.dataReceived('Subject: subject two\r\n')
- c.dataReceived(')\r\n')
- c.dataReceived('0003 OK FETCH completed\r\n')
- return d
-
- def test(res):
- self.assertEqual(res, {
- 1: [['BODY', ['HEADER.FIELDS', ['SUBJECT']],
- 'Subject: subject one\r\n']],
- 2: [['BODY', ['HEADER.FIELDS', ['SUBJECT']],
- 'Subject: subject two\r\n']]
- })
-
- self.assertEqual(c.flags, {1: ['\\Recent'], 2: ['\\Seen']})
-
- return login(
- ).addCallback(strip(select)
- ).addCallback(strip(fetch)
- ).addCallback(test)
-
-
- def test_flagsChangedInsideFetchMessageResponse(self):
- """
- Any unrequested flag information received along with other requested
- information in an untagged I{FETCH} received in response to a request
- issued with L{IMAP4Client.fetchMessage} is passed to the
- C{flagsChanged} callback.
- """
- transport = StringTransport()
- c = StillSimplerClient()
- c.makeConnection(transport)
- c.lineReceived('* OK [IMAP4rev1]')
-
- def login():
- d = c.login('blah', 'blah')
- c.dataReceived('0001 OK LOGIN\r\n')
- return d
- def select():
- d = c.select('inbox')
- c.lineReceived('0002 OK SELECT')
- return d
- def fetch():
- d = c.fetchMessage('1:*')
- c.dataReceived('* 1 FETCH (RFC822 {24}\r\n')
- c.dataReceived('Subject: first subject\r\n')
- c.dataReceived(' FLAGS (\Seen))\r\n')
- c.dataReceived('* 2 FETCH (FLAGS (\Recent \Seen) RFC822 {25}\r\n')
- c.dataReceived('Subject: second subject\r\n')
- c.dataReceived(')\r\n')
- c.dataReceived('0003 OK FETCH completed\r\n')
- return d
-
- def test(res):
- self.assertEqual(res, {
- 1: {'RFC822': 'Subject: first subject\r\n'},
- 2: {'RFC822': 'Subject: second subject\r\n'}})
-
- self.assertEqual(
- c.flags, {1: ['\\Seen'], 2: ['\\Recent', '\\Seen']})
-
- return login(
- ).addCallback(strip(select)
- ).addCallback(strip(fetch)
- ).addCallback(test)
-
-
-
-class PreauthIMAP4ClientMixin:
- """
- Mixin for L{unittest.TestCase} subclasses which provides a C{setUp} method
- which creates an L{IMAP4Client} connected to a L{StringTransport} and puts
- it into the I{authenticated} state.
-
- @ivar transport: A L{StringTransport} to which C{client} is connected.
- @ivar client: An L{IMAP4Client} which is connected to C{transport}.
- """
- clientProtocol = imap4.IMAP4Client
-
- def setUp(self):
- """
- Create an IMAP4Client connected to a fake transport and in the
- authenticated state.
- """
- self.transport = StringTransport()
- self.client = self.clientProtocol()
- self.client.makeConnection(self.transport)
- self.client.dataReceived('* PREAUTH Hello unittest\r\n')
-
-
- def _extractDeferredResult(self, d):
- """
- Synchronously extract the result of the given L{Deferred}. Fail the
- test if that is not possible.
- """
- result = []
- error = []
- d.addCallbacks(result.append, error.append)
- if result:
- return result[0]
- elif error:
- error[0].raiseException()
- else:
- self.fail("Expected result not available")
-
-
-
-class SelectionTestsMixin(PreauthIMAP4ClientMixin):
- """
- Mixin for test cases which defines tests which apply to both I{EXAMINE} and
- I{SELECT} support.
- """
- def _examineOrSelect(self):
- """
- Issue either an I{EXAMINE} or I{SELECT} command (depending on
- C{self.method}), assert that the correct bytes are written to the
- transport, and return the L{Deferred} returned by whichever method was
- called.
- """
- d = getattr(self.client, self.method)('foobox')
- self.assertEqual(
- self.transport.value(), '0001 %s foobox\r\n' % (self.command,))
- return d
-
-
- def _response(self, *lines):
- """
- Deliver the given (unterminated) response lines to C{self.client} and
- then deliver a tagged SELECT or EXAMINE completion line to finish the
- SELECT or EXAMINE response.
- """
- for line in lines:
- self.client.dataReceived(line + '\r\n')
- self.client.dataReceived(
- '0001 OK [READ-ONLY] %s completed\r\n' % (self.command,))
-
-
- def test_exists(self):
- """
- If the server response to a I{SELECT} or I{EXAMINE} command includes an
- I{EXISTS} response, the L{Deferred} return by L{IMAP4Client.select} or
- L{IMAP4Client.examine} fires with a C{dict} including the value
- associated with the C{'EXISTS'} key.
- """
- d = self._examineOrSelect()
- self._response('* 3 EXISTS')
- self.assertEqual(
- self._extractDeferredResult(d),
- {'READ-WRITE': False, 'EXISTS': 3})
-
-
- def test_nonIntegerExists(self):
- """
- If the server returns a non-integer EXISTS value in its response to a
- I{SELECT} or I{EXAMINE} command, the L{Deferred} returned by
- L{IMAP4Client.select} or L{IMAP4Client.examine} fails with
- L{IllegalServerResponse}.
- """
- d = self._examineOrSelect()
- self._response('* foo EXISTS')
- self.assertRaises(
- imap4.IllegalServerResponse, self._extractDeferredResult, d)
-
-
- def test_recent(self):
- """
- If the server response to a I{SELECT} or I{EXAMINE} command includes an
- I{RECENT} response, the L{Deferred} return by L{IMAP4Client.select} or
- L{IMAP4Client.examine} fires with a C{dict} including the value
- associated with the C{'RECENT'} key.
- """
- d = self._examineOrSelect()
- self._response('* 5 RECENT')
- self.assertEqual(
- self._extractDeferredResult(d),
- {'READ-WRITE': False, 'RECENT': 5})
-
-
- def test_nonIntegerRecent(self):
- """
- If the server returns a non-integer RECENT value in its response to a
- I{SELECT} or I{EXAMINE} command, the L{Deferred} returned by
- L{IMAP4Client.select} or L{IMAP4Client.examine} fails with
- L{IllegalServerResponse}.
- """
- d = self._examineOrSelect()
- self._response('* foo RECENT')
- self.assertRaises(
- imap4.IllegalServerResponse, self._extractDeferredResult, d)
-
-
- def test_unseen(self):
- """
- If the server response to a I{SELECT} or I{EXAMINE} command includes an
- I{UNSEEN} response, the L{Deferred} returned by L{IMAP4Client.select} or
- L{IMAP4Client.examine} fires with a C{dict} including the value
- associated with the C{'UNSEEN'} key.
- """
- d = self._examineOrSelect()
- self._response('* OK [UNSEEN 8] Message 8 is first unseen')
- self.assertEqual(
- self._extractDeferredResult(d),
- {'READ-WRITE': False, 'UNSEEN': 8})
-
-
- def test_nonIntegerUnseen(self):
- """
- If the server returns a non-integer UNSEEN value in its response to a
- I{SELECT} or I{EXAMINE} command, the L{Deferred} returned by
- L{IMAP4Client.select} or L{IMAP4Client.examine} fails with
- L{IllegalServerResponse}.
- """
- d = self._examineOrSelect()
- self._response('* OK [UNSEEN foo] Message foo is first unseen')
- self.assertRaises(
- imap4.IllegalServerResponse, self._extractDeferredResult, d)
-
-
- def test_uidvalidity(self):
- """
- If the server response to a I{SELECT} or I{EXAMINE} command includes an
- I{UIDVALIDITY} response, the L{Deferred} returned by
- L{IMAP4Client.select} or L{IMAP4Client.examine} fires with a C{dict}
- including the value associated with the C{'UIDVALIDITY'} key.
- """
- d = self._examineOrSelect()
- self._response('* OK [UIDVALIDITY 12345] UIDs valid')
- self.assertEqual(
- self._extractDeferredResult(d),
- {'READ-WRITE': False, 'UIDVALIDITY': 12345})
-
-
- def test_nonIntegerUIDVALIDITY(self):
- """
- If the server returns a non-integer UIDVALIDITY value in its response to
- a I{SELECT} or I{EXAMINE} command, the L{Deferred} returned by
- L{IMAP4Client.select} or L{IMAP4Client.examine} fails with
- L{IllegalServerResponse}.
- """
- d = self._examineOrSelect()
- self._response('* OK [UIDVALIDITY foo] UIDs valid')
- self.assertRaises(
- imap4.IllegalServerResponse, self._extractDeferredResult, d)
-
-
- def test_uidnext(self):
- """
- If the server response to a I{SELECT} or I{EXAMINE} command includes an
- I{UIDNEXT} response, the L{Deferred} returned by L{IMAP4Client.select}
- or L{IMAP4Client.examine} fires with a C{dict} including the value
- associated with the C{'UIDNEXT'} key.
- """
- d = self._examineOrSelect()
- self._response('* OK [UIDNEXT 4392] Predicted next UID')
- self.assertEqual(
- self._extractDeferredResult(d),
- {'READ-WRITE': False, 'UIDNEXT': 4392})
-
-
- def test_nonIntegerUIDNEXT(self):
- """
- If the server returns a non-integer UIDNEXT value in its response to a
- I{SELECT} or I{EXAMINE} command, the L{Deferred} returned by
- L{IMAP4Client.select} or L{IMAP4Client.examine} fails with
- L{IllegalServerResponse}.
- """
- d = self._examineOrSelect()
- self._response('* OK [UIDNEXT foo] Predicted next UID')
- self.assertRaises(
- imap4.IllegalServerResponse, self._extractDeferredResult, d)
-
-
- def test_flags(self):
- """
- If the server response to a I{SELECT} or I{EXAMINE} command includes an
- I{FLAGS} response, the L{Deferred} returned by L{IMAP4Client.select} or
- L{IMAP4Client.examine} fires with a C{dict} including the value
- associated with the C{'FLAGS'} key.
- """
- d = self._examineOrSelect()
- self._response(
- '* FLAGS (\\Answered \\Flagged \\Deleted \\Seen \\Draft)')
- self.assertEqual(
- self._extractDeferredResult(d), {
- 'READ-WRITE': False,
- 'FLAGS': ('\\Answered', '\\Flagged', '\\Deleted', '\\Seen',
- '\\Draft')})
-
-
- def test_permanentflags(self):
- """
- If the server response to a I{SELECT} or I{EXAMINE} command includes an
- I{FLAGS} response, the L{Deferred} returned by L{IMAP4Client.select} or
- L{IMAP4Client.examine} fires with a C{dict} including the value
- associated with the C{'FLAGS'} key.
- """
- d = self._examineOrSelect()
- self._response(
- '* OK [PERMANENTFLAGS (\\Starred)] Just one permanent flag in '
- 'that list up there')
- self.assertEqual(
- self._extractDeferredResult(d), {
- 'READ-WRITE': False,
- 'PERMANENTFLAGS': ('\\Starred',)})
-
-
- def test_unrecognizedOk(self):
- """
- If the server response to a I{SELECT} or I{EXAMINE} command includes an
- I{OK} with unrecognized response code text, parsing does not fail.
- """
- d = self._examineOrSelect()
- self._response(
- '* OK [X-MADE-UP] I just made this response text up.')
- # The value won't show up in the result. It would be okay if it did
- # someday, perhaps. This shouldn't ever happen, though.
- self.assertEqual(
- self._extractDeferredResult(d), {'READ-WRITE': False})
-
-
- def test_bareOk(self):
- """
- If the server response to a I{SELECT} or I{EXAMINE} command includes an
- I{OK} with no response code text, parsing does not fail.
- """
- d = self._examineOrSelect()
- self._response('* OK')
- self.assertEqual(
- self._extractDeferredResult(d), {'READ-WRITE': False})
-
-
-
-class IMAP4ClientExamineTests(SelectionTestsMixin, unittest.TestCase):
- """
- Tests for the L{IMAP4Client.examine} method.
-
- An example of usage of the EXAMINE command from RFC 3501, section 6.3.2::
-
- S: * 17 EXISTS
- S: * 2 RECENT
- S: * OK [UNSEEN 8] Message 8 is first unseen
- S: * OK [UIDVALIDITY 3857529045] UIDs valid
- S: * OK [UIDNEXT 4392] Predicted next UID
- S: * FLAGS (\\Answered \\Flagged \\Deleted \\Seen \\Draft)
- S: * OK [PERMANENTFLAGS ()] No permanent flags permitted
- S: A932 OK [READ-ONLY] EXAMINE completed
- """
- method = 'examine'
- command = 'EXAMINE'
-
-
-
-
-class IMAP4ClientSelectTests(SelectionTestsMixin, unittest.TestCase):
- """
- Tests for the L{IMAP4Client.select} method.
-
- An example of usage of the SELECT command from RFC 3501, section 6.3.1::
-
- C: A142 SELECT INBOX
- S: * 172 EXISTS
- S: * 1 RECENT
- S: * OK [UNSEEN 12] Message 12 is first unseen
- S: * OK [UIDVALIDITY 3857529045] UIDs valid
- S: * OK [UIDNEXT 4392] Predicted next UID
- S: * FLAGS (\Answered \Flagged \Deleted \Seen \Draft)
- S: * OK [PERMANENTFLAGS (\Deleted \Seen \*)] Limited
- S: A142 OK [READ-WRITE] SELECT completed
- """
- method = 'select'
- command = 'SELECT'
-
-
-
-class IMAP4ClientExpungeTests(PreauthIMAP4ClientMixin, unittest.TestCase):
- """
- Tests for the L{IMAP4Client.expunge} method.
-
- An example of usage of the EXPUNGE command from RFC 3501, section 6.4.3::
-
- C: A202 EXPUNGE
- S: * 3 EXPUNGE
- S: * 3 EXPUNGE
- S: * 5 EXPUNGE
- S: * 8 EXPUNGE
- S: A202 OK EXPUNGE completed
- """
- def _expunge(self):
- d = self.client.expunge()
- self.assertEqual(self.transport.value(), '0001 EXPUNGE\r\n')
- self.transport.clear()
- return d
-
-
- def _response(self, sequenceNumbers):
- for number in sequenceNumbers:
- self.client.lineReceived('* %s EXPUNGE' % (number,))
- self.client.lineReceived('0001 OK EXPUNGE COMPLETED')
-
-
- def test_expunge(self):
- """
- L{IMAP4Client.expunge} sends the I{EXPUNGE} command and returns a
- L{Deferred} which fires with a C{list} of message sequence numbers
- given by the server's response.
- """
- d = self._expunge()
- self._response([3, 3, 5, 8])
- self.assertEqual(self._extractDeferredResult(d), [3, 3, 5, 8])
-
-
- def test_nonIntegerExpunged(self):
- """
- If the server responds with a non-integer where a message sequence
- number is expected, the L{Deferred} returned by L{IMAP4Client.expunge}
- fails with L{IllegalServerResponse}.
- """
- d = self._expunge()
- self._response([3, 3, 'foo', 8])
- self.assertRaises(
- imap4.IllegalServerResponse, self._extractDeferredResult, d)
-
-
-
-class IMAP4ClientSearchTests(PreauthIMAP4ClientMixin, unittest.TestCase):
- """
- Tests for the L{IMAP4Client.search} method.
-
- An example of usage of the SEARCH command from RFC 3501, section 6.4.4::
-
- C: A282 SEARCH FLAGGED SINCE 1-Feb-1994 NOT FROM "Smith"
- S: * SEARCH 2 84 882
- S: A282 OK SEARCH completed
- C: A283 SEARCH TEXT "string not in mailbox"
- S: * SEARCH
- S: A283 OK SEARCH completed
- C: A284 SEARCH CHARSET UTF-8 TEXT {6}
- C: XXXXXX
- S: * SEARCH 43
- S: A284 OK SEARCH completed
- """
- def _search(self):
- d = self.client.search(imap4.Query(text="ABCDEF"))
- self.assertEqual(
- self.transport.value(), '0001 SEARCH (TEXT "ABCDEF")\r\n')
- return d
-
-
- def _response(self, messageNumbers):
- self.client.lineReceived(
- "* SEARCH " + " ".join(map(str, messageNumbers)))
- self.client.lineReceived("0001 OK SEARCH completed")
-
-
- def test_search(self):
- """
- L{IMAP4Client.search} sends the I{SEARCH} command and returns a
- L{Deferred} which fires with a C{list} of message sequence numbers
- given by the server's response.
- """
- d = self._search()
- self._response([2, 5, 10])
- self.assertEqual(self._extractDeferredResult(d), [2, 5, 10])
-
-
- def test_nonIntegerFound(self):
- """
- If the server responds with a non-integer where a message sequence
- number is expected, the L{Deferred} returned by L{IMAP4Client.search}
- fails with L{IllegalServerResponse}.
- """
- d = self._search()
- self._response([2, "foo", 10])
- self.assertRaises(
- imap4.IllegalServerResponse, self._extractDeferredResult, d)
-
-
-
-class IMAP4ClientFetchTests(PreauthIMAP4ClientMixin, unittest.TestCase):
- """
- Tests for the L{IMAP4Client.fetch} method.
-
- See RFC 3501, section 6.4.5.
- """
- def test_fetchUID(self):
- """
- L{IMAP4Client.fetchUID} sends the I{FETCH UID} command and returns a
- L{Deferred} which fires with a C{dict} mapping message sequence numbers
- to C{dict}s mapping C{'UID'} to that message's I{UID} in the server's
- response.
- """
- d = self.client.fetchUID('1:7')
- self.assertEqual(self.transport.value(), '0001 FETCH 1:7 (UID)\r\n')
- self.client.lineReceived('* 2 FETCH (UID 22)')
- self.client.lineReceived('* 3 FETCH (UID 23)')
- self.client.lineReceived('* 4 FETCH (UID 24)')
- self.client.lineReceived('* 5 FETCH (UID 25)')
- self.client.lineReceived('0001 OK FETCH completed')
- self.assertEqual(
- self._extractDeferredResult(d), {
- 2: {'UID': '22'},
- 3: {'UID': '23'},
- 4: {'UID': '24'},
- 5: {'UID': '25'}})
-
-
- def test_fetchUIDNonIntegerFound(self):
- """
- If the server responds with a non-integer where a message sequence
- number is expected, the L{Deferred} returned by L{IMAP4Client.fetchUID}
- fails with L{IllegalServerResponse}.
- """
- d = self.client.fetchUID('1')
- self.assertEqual(self.transport.value(), '0001 FETCH 1 (UID)\r\n')
- self.client.lineReceived('* foo FETCH (UID 22)')
- self.client.lineReceived('0001 OK FETCH completed')
- self.assertRaises(
- imap4.IllegalServerResponse, self._extractDeferredResult, d)
-
-
- def test_incompleteFetchUIDResponse(self):
- """
- If the server responds with an incomplete I{FETCH} response line, the
- L{Deferred} returned by L{IMAP4Client.fetchUID} fails with
- L{IllegalServerResponse}.
- """
- d = self.client.fetchUID('1:7')
- self.assertEqual(self.transport.value(), '0001 FETCH 1:7 (UID)\r\n')
- self.client.lineReceived('* 2 FETCH (UID 22)')
- self.client.lineReceived('* 3 FETCH (UID)')
- self.client.lineReceived('* 4 FETCH (UID 24)')
- self.client.lineReceived('0001 OK FETCH completed')
- self.assertRaises(
- imap4.IllegalServerResponse, self._extractDeferredResult, d)
-
-
- def test_fetchBody(self):
- """
- L{IMAP4Client.fetchBody} sends the I{FETCH BODY} command and returns a
- L{Deferred} which fires with a C{dict} mapping message sequence numbers
- to C{dict}s mapping C{'RFC822.TEXT'} to that message's body as given in
- the server's response.
- """
- d = self.client.fetchBody('3')
- self.assertEqual(
- self.transport.value(), '0001 FETCH 3 (RFC822.TEXT)\r\n')
- self.client.lineReceived('* 3 FETCH (RFC822.TEXT "Message text")')
- self.client.lineReceived('0001 OK FETCH completed')
- self.assertEqual(
- self._extractDeferredResult(d),
- {3: {'RFC822.TEXT': 'Message text'}})
-
-
- def test_fetchSpecific(self):
- """
- L{IMAP4Client.fetchSpecific} sends the I{BODY[]} command if no
- parameters beyond the message set to retrieve are given. It returns a
- L{Deferred} which fires with a C{dict} mapping message sequence numbers
- to C{list}s of corresponding message data given by the server's
- response.
- """
- d = self.client.fetchSpecific('7')
- self.assertEqual(
- self.transport.value(), '0001 FETCH 7 BODY[]\r\n')
- self.client.lineReceived('* 7 FETCH (BODY[] "Some body")')
- self.client.lineReceived('0001 OK FETCH completed')
- self.assertEqual(
- self._extractDeferredResult(d), {7: [['BODY', [], "Some body"]]})
-
-
- def test_fetchSpecificPeek(self):
- """
- L{IMAP4Client.fetchSpecific} issues a I{BODY.PEEK[]} command if passed
- C{True} for the C{peek} parameter.
- """
- d = self.client.fetchSpecific('6', peek=True)
- self.assertEqual(
- self.transport.value(), '0001 FETCH 6 BODY.PEEK[]\r\n')
- # BODY.PEEK responses are just BODY
- self.client.lineReceived('* 6 FETCH (BODY[] "Some body")')
- self.client.lineReceived('0001 OK FETCH completed')
- self.assertEqual(
- self._extractDeferredResult(d), {6: [['BODY', [], "Some body"]]})
-
-
- def test_fetchSpecificNumbered(self):
- """
- L{IMAP4Client.fetchSpecific}, when passed a sequence for for
- C{headerNumber}, sends the I{BODY[N.M]} command. It returns a
- L{Deferred} which fires with a C{dict} mapping message sequence numbers
- to C{list}s of corresponding message data given by the server's
- response.
- """
- d = self.client.fetchSpecific('7', headerNumber=(1, 2, 3))
- self.assertEqual(
- self.transport.value(), '0001 FETCH 7 BODY[1.2.3]\r\n')
- self.client.lineReceived('* 7 FETCH (BODY[1.2.3] "Some body")')
- self.client.lineReceived('0001 OK FETCH completed')
- self.assertEqual(
- self._extractDeferredResult(d),
- {7: [['BODY', ['1.2.3'], "Some body"]]})
-
-
- def test_fetchSpecificText(self):
- """
- L{IMAP4Client.fetchSpecific}, when passed C{'TEXT'} for C{headerType},
- sends the I{BODY[TEXT]} command. It returns a L{Deferred} which fires
- with a C{dict} mapping message sequence numbers to C{list}s of
- corresponding message data given by the server's response.
- """
- d = self.client.fetchSpecific('8', headerType='TEXT')
- self.assertEqual(
- self.transport.value(), '0001 FETCH 8 BODY[TEXT]\r\n')
- self.client.lineReceived('* 8 FETCH (BODY[TEXT] "Some body")')
- self.client.lineReceived('0001 OK FETCH completed')
- self.assertEqual(
- self._extractDeferredResult(d),
- {8: [['BODY', ['TEXT'], "Some body"]]})
-
-
- def test_fetchSpecificNumberedText(self):
- """
- If passed a value for the C{headerNumber} parameter and C{'TEXT'} for
- the C{headerType} parameter, L{IMAP4Client.fetchSpecific} sends a
- I{BODY[number.TEXT]} request and returns a L{Deferred} which fires with
- a C{dict} mapping message sequence numbers to C{list}s of message data
- given by the server's response.
- """
- d = self.client.fetchSpecific('4', headerType='TEXT', headerNumber=7)
- self.assertEqual(
- self.transport.value(), '0001 FETCH 4 BODY[7.TEXT]\r\n')
- self.client.lineReceived('* 4 FETCH (BODY[7.TEXT] "Some body")')
- self.client.lineReceived('0001 OK FETCH completed')
- self.assertEqual(
- self._extractDeferredResult(d),
- {4: [['BODY', ['7.TEXT'], "Some body"]]})
-
-
- def test_incompleteFetchSpecificTextResponse(self):
- """
- If the server responds to a I{BODY[TEXT]} request with a I{FETCH} line
- which is truncated after the I{BODY[TEXT]} tokens, the L{Deferred}
- returned by L{IMAP4Client.fetchUID} fails with
- L{IllegalServerResponse}.
- """
- d = self.client.fetchSpecific('8', headerType='TEXT')
- self.assertEqual(
- self.transport.value(), '0001 FETCH 8 BODY[TEXT]\r\n')
- self.client.lineReceived('* 8 FETCH (BODY[TEXT])')
- self.client.lineReceived('0001 OK FETCH completed')
- self.assertRaises(
- imap4.IllegalServerResponse, self._extractDeferredResult, d)
-
-
- def test_fetchSpecificMIME(self):
- """
- L{IMAP4Client.fetchSpecific}, when passed C{'MIME'} for C{headerType},
- sends the I{BODY[MIME]} command. It returns a L{Deferred} which fires
- with a C{dict} mapping message sequence numbers to C{list}s of
- corresponding message data given by the server's response.
- """
- d = self.client.fetchSpecific('8', headerType='MIME')
- self.assertEqual(
- self.transport.value(), '0001 FETCH 8 BODY[MIME]\r\n')
- self.client.lineReceived('* 8 FETCH (BODY[MIME] "Some body")')
- self.client.lineReceived('0001 OK FETCH completed')
- self.assertEqual(
- self._extractDeferredResult(d),
- {8: [['BODY', ['MIME'], "Some body"]]})
-
-
- def test_fetchSpecificPartial(self):
- """
- L{IMAP4Client.fetchSpecific}, when passed C{offset} and C{length},
- sends a partial content request (like I{BODY[TEXT]<offset.length>}).
- It returns a L{Deferred} which fires with a C{dict} mapping message
- sequence numbers to C{list}s of corresponding message data given by the
- server's response.
- """
- d = self.client.fetchSpecific(
- '9', headerType='TEXT', offset=17, length=3)
- self.assertEqual(
- self.transport.value(), '0001 FETCH 9 BODY[TEXT]<17.3>\r\n')
- self.client.lineReceived('* 9 FETCH (BODY[TEXT]<17> "foo")')
- self.client.lineReceived('0001 OK FETCH completed')
- self.assertEqual(
- self._extractDeferredResult(d),
- {9: [['BODY', ['TEXT'], '<17>', 'foo']]})
-
-
- def test_incompleteFetchSpecificPartialResponse(self):
- """
- If the server responds to a I{BODY[TEXT]} request with a I{FETCH} line
- which is truncated after the I{BODY[TEXT]<offset>} tokens, the
- L{Deferred} returned by L{IMAP4Client.fetchUID} fails with
- L{IllegalServerResponse}.
- """
- d = self.client.fetchSpecific('8', headerType='TEXT')
- self.assertEqual(
- self.transport.value(), '0001 FETCH 8 BODY[TEXT]\r\n')
- self.client.lineReceived('* 8 FETCH (BODY[TEXT]<17>)')
- self.client.lineReceived('0001 OK FETCH completed')
- self.assertRaises(
- imap4.IllegalServerResponse, self._extractDeferredResult, d)
-
-
- def test_fetchSpecificHTML(self):
- """
- If the body of a message begins with I{<} and ends with I{>} (as,
- for example, HTML bodies typically will), this is still interpreted
- as the body by L{IMAP4Client.fetchSpecific} (and particularly, not
- as a length indicator for a response to a request for a partial
- body).
- """
- d = self.client.fetchSpecific('7')
- self.assertEqual(
- self.transport.value(), '0001 FETCH 7 BODY[]\r\n')
- self.client.lineReceived('* 7 FETCH (BODY[] "<html>test</html>")')
- self.client.lineReceived('0001 OK FETCH completed')
- self.assertEqual(
- self._extractDeferredResult(d), {7: [['BODY', [], "<html>test</html>"]]})
-
-
-
-class IMAP4ClientStoreTests(PreauthIMAP4ClientMixin, unittest.TestCase):
- """
- Tests for the L{IMAP4Client.setFlags}, L{IMAP4Client.addFlags}, and
- L{IMAP4Client.removeFlags} methods.
-
- An example of usage of the STORE command, in terms of which these three
- methods are implemented, from RFC 3501, section 6.4.6::
-
- C: A003 STORE 2:4 +FLAGS (\Deleted)
- S: * 2 FETCH (FLAGS (\Deleted \Seen))
- S: * 3 FETCH (FLAGS (\Deleted))
- S: * 4 FETCH (FLAGS (\Deleted \Flagged \Seen))
- S: A003 OK STORE completed
- """
- clientProtocol = StillSimplerClient
-
- def _flagsTest(self, method, item):
- """
- Test a non-silent flag modifying method. Call the method, assert that
- the correct bytes are sent, deliver a I{FETCH} response, and assert
- that the result of the Deferred returned by the method is correct.
-
- @param method: The name of the method to test.
- @param item: The data item which is expected to be specified.
- """
- d = getattr(self.client, method)('3', ('\\Read', '\\Seen'), False)
- self.assertEqual(
- self.transport.value(),
- '0001 STORE 3 ' + item + ' (\\Read \\Seen)\r\n')
- self.client.lineReceived('* 3 FETCH (FLAGS (\\Read \\Seen))')
- self.client.lineReceived('0001 OK STORE completed')
- self.assertEqual(
- self._extractDeferredResult(d),
- {3: {'FLAGS': ['\\Read', '\\Seen']}})
-
-
- def _flagsSilentlyTest(self, method, item):
- """
- Test a silent flag modifying method. Call the method, assert that the
- correct bytes are sent, deliver an I{OK} response, and assert that the
- result of the Deferred returned by the method is correct.
-
- @param method: The name of the method to test.
- @param item: The data item which is expected to be specified.
- """
- d = getattr(self.client, method)('3', ('\\Read', '\\Seen'), True)
- self.assertEqual(
- self.transport.value(),
- '0001 STORE 3 ' + item + ' (\\Read \\Seen)\r\n')
- self.client.lineReceived('0001 OK STORE completed')
- self.assertEqual(self._extractDeferredResult(d), {})
-
-
- def _flagsSilentlyWithUnsolicitedDataTest(self, method, item):
- """
- Test unsolicited data received in response to a silent flag modifying
- method. Call the method, assert that the correct bytes are sent,
- deliver the unsolicited I{FETCH} response, and assert that the result
- of the Deferred returned by the method is correct.
-
- @param method: The name of the method to test.
- @param item: The data item which is expected to be specified.
- """
- d = getattr(self.client, method)('3', ('\\Read', '\\Seen'), True)
- self.assertEqual(
- self.transport.value(),
- '0001 STORE 3 ' + item + ' (\\Read \\Seen)\r\n')
- self.client.lineReceived('* 2 FETCH (FLAGS (\\Read \\Seen))')
- self.client.lineReceived('0001 OK STORE completed')
- self.assertEqual(self._extractDeferredResult(d), {})
- self.assertEqual(self.client.flags, {2: ['\\Read', '\\Seen']})
-
-
- def test_setFlags(self):
- """
- When passed a C{False} value for the C{silent} parameter,
- L{IMAP4Client.setFlags} sends the I{STORE} command with a I{FLAGS} data
- item and returns a L{Deferred} which fires with a C{dict} mapping
- message sequence numbers to C{dict}s mapping C{'FLAGS'} to the new
- flags of those messages.
- """
- self._flagsTest('setFlags', 'FLAGS')
-
-
- def test_setFlagsSilently(self):
- """
- When passed a C{True} value for the C{silent} parameter,
- L{IMAP4Client.setFlags} sends the I{STORE} command with a
- I{FLAGS.SILENT} data item and returns a L{Deferred} which fires with an
- empty dictionary.
- """
- self._flagsSilentlyTest('setFlags', 'FLAGS.SILENT')
-
-
- def test_setFlagsSilentlyWithUnsolicitedData(self):
- """
- If unsolicited flag data is received in response to a I{STORE}
- I{FLAGS.SILENT} request, that data is passed to the C{flagsChanged}
- callback.
- """
- self._flagsSilentlyWithUnsolicitedDataTest('setFlags', 'FLAGS.SILENT')
-
-
- def test_addFlags(self):
- """
- L{IMAP4Client.addFlags} is like L{IMAP4Client.setFlags}, but sends
- I{+FLAGS} instead of I{FLAGS}.
- """
- self._flagsTest('addFlags', '+FLAGS')
-
-
- def test_addFlagsSilently(self):
- """
- L{IMAP4Client.addFlags} with a C{True} value for C{silent} behaves like
- L{IMAP4Client.setFlags} with a C{True} value for C{silent}, but it
- sends I{+FLAGS.SILENT} instead of I{FLAGS.SILENT}.
- """
- self._flagsSilentlyTest('addFlags', '+FLAGS.SILENT')
-
-
- def test_addFlagsSilentlyWithUnsolicitedData(self):
- """
- L{IMAP4Client.addFlags} behaves like L{IMAP4Client.setFlags} when used
- in silent mode and unsolicited data is received.
- """
- self._flagsSilentlyWithUnsolicitedDataTest('addFlags', '+FLAGS.SILENT')
-
-
- def test_removeFlags(self):
- """
- L{IMAP4Client.removeFlags} is like L{IMAP4Client.setFlags}, but sends
- I{-FLAGS} instead of I{FLAGS}.
- """
- self._flagsTest('removeFlags', '-FLAGS')
-
-
- def test_removeFlagsSilently(self):
- """
- L{IMAP4Client.removeFlags} with a C{True} value for C{silent} behaves
- like L{IMAP4Client.setFlags} with a C{True} value for C{silent}, but it
- sends I{-FLAGS.SILENT} instead of I{FLAGS.SILENT}.
- """
- self._flagsSilentlyTest('removeFlags', '-FLAGS.SILENT')
-
-
- def test_removeFlagsSilentlyWithUnsolicitedData(self):
- """
- L{IMAP4Client.removeFlags} behaves like L{IMAP4Client.setFlags} when
- used in silent mode and unsolicited data is received.
- """
- self._flagsSilentlyWithUnsolicitedDataTest('removeFlags', '-FLAGS.SILENT')
-
-
-
-class FakeyServer(imap4.IMAP4Server):
- state = 'select'
- timeout = None
-
- def sendServerGreeting(self):
- pass
-
-class FakeyMessage(util.FancyStrMixin):
- implements(imap4.IMessage)
-
- showAttributes = ('headers', 'flags', 'date', 'body', 'uid')
-
- def __init__(self, headers, flags, date, body, uid, subpart):
- self.headers = headers
- self.flags = flags
- self._body = body
- self.size = len(body)
- self.date = date
- self.uid = uid
- self.subpart = subpart
-
- def getHeaders(self, negate, *names):
- self.got_headers = negate, names
- return self.headers
-
- def getFlags(self):
- return self.flags
-
- def getInternalDate(self):
- return self.date
-
- def getBodyFile(self):
- return StringIO(self._body)
-
- def getSize(self):
- return self.size
-
- def getUID(self):
- return self.uid
-
- def isMultipart(self):
- return self.subpart is not None
-
- def getSubPart(self, part):
- self.got_subpart = part
- return self.subpart[part]
-
-class NewStoreTestCase(unittest.TestCase, IMAP4HelperMixin):
- result = None
- storeArgs = None
-
- def setUp(self):
- self.received_messages = self.received_uid = None
-
- self.server = imap4.IMAP4Server()
- self.server.state = 'select'
- self.server.mbox = self
- self.connected = defer.Deferred()
- self.client = SimpleClient(self.connected)
-
- def addListener(self, x):
- pass
- def removeListener(self, x):
- pass
-
- def store(self, *args, **kw):
- self.storeArgs = args, kw
- return self.response
-
- def _storeWork(self):
- def connected():
- return self.function(self.messages, self.flags, self.silent, self.uid)
- def result(R):
- self.result = R
-
- self.connected.addCallback(strip(connected)
- ).addCallback(result
- ).addCallback(self._cbStopClient
- ).addErrback(self._ebGeneral)
-
- def check(ignored):
- self.assertEqual(self.result, self.expected)
- self.assertEqual(self.storeArgs, self.expectedArgs)
- d = loopback.loopbackTCP(self.server, self.client, noisy=False)
- d.addCallback(check)
- return d
-
- def testSetFlags(self, uid=0):
- self.function = self.client.setFlags
- self.messages = '1,5,9'
- self.flags = ['\\A', '\\B', 'C']
- self.silent = False
- self.uid = uid
- self.response = {
- 1: ['\\A', '\\B', 'C'],
- 5: ['\\A', '\\B', 'C'],
- 9: ['\\A', '\\B', 'C'],
- }
- self.expected = {
- 1: {'FLAGS': ['\\A', '\\B', 'C']},
- 5: {'FLAGS': ['\\A', '\\B', 'C']},
- 9: {'FLAGS': ['\\A', '\\B', 'C']},
- }
- msg = imap4.MessageSet()
- msg.add(1)
- msg.add(5)
- msg.add(9)
- self.expectedArgs = ((msg, ['\\A', '\\B', 'C'], 0), {'uid': 0})
- return self._storeWork()
-
-
-
-class GetBodyStructureTests(unittest.TestCase):
- """
- Tests for L{imap4.getBodyStructure}, a helper for constructing a list which
- directly corresponds to the wire information needed for a I{BODY} or
- I{BODYSTRUCTURE} response.
- """
- def test_singlePart(self):
- """
- L{imap4.getBodyStructure} accepts a L{IMessagePart} provider and returns
- a list giving the basic fields for the I{BODY} response for that
- message.
- """
- body = 'hello, world'
- major = 'image'
- minor = 'jpeg'
- charset = 'us-ascii'
- identifier = 'some kind of id'
- description = 'great justice'
- encoding = 'maximum'
- msg = FakeyMessage({
- 'content-type': '%s/%s; charset=%s; x=y' % (
- major, minor, charset),
- 'content-id': identifier,
- 'content-description': description,
- 'content-transfer-encoding': encoding,
- }, (), '', body, 123, None)
- structure = imap4.getBodyStructure(msg)
- self.assertEqual(
- [major, minor, ["charset", charset, 'x', 'y'], identifier,
- description, encoding, len(body)],
- structure)
-
-
- def test_singlePartExtended(self):
- """
- L{imap4.getBodyStructure} returns a list giving the basic and extended
- fields for a I{BODYSTRUCTURE} response if passed C{True} for the
- C{extended} parameter.
- """
- body = 'hello, world'
- major = 'image'
- minor = 'jpeg'
- charset = 'us-ascii'
- identifier = 'some kind of id'
- description = 'great justice'
- encoding = 'maximum'
- md5 = 'abcdefabcdef'
- msg = FakeyMessage({
- 'content-type': '%s/%s; charset=%s; x=y' % (
- major, minor, charset),
- 'content-id': identifier,
- 'content-description': description,
- 'content-transfer-encoding': encoding,
- 'content-md5': md5,
- 'content-disposition': 'attachment; name=foo; size=bar',
- 'content-language': 'fr',
- 'content-location': 'France',
- }, (), '', body, 123, None)
- structure = imap4.getBodyStructure(msg, extended=True)
- self.assertEqual(
- [major, minor, ["charset", charset, 'x', 'y'], identifier,
- description, encoding, len(body), md5,
- ['attachment', ['name', 'foo', 'size', 'bar']], 'fr', 'France'],
- structure)
-
-
- def test_singlePartWithMissing(self):
- """
- For fields with no information contained in the message headers,
- L{imap4.getBodyStructure} fills in C{None} values in its result.
- """
- major = 'image'
- minor = 'jpeg'
- body = 'hello, world'
- msg = FakeyMessage({
- 'content-type': '%s/%s' % (major, minor),
- }, (), '', body, 123, None)
- structure = imap4.getBodyStructure(msg, extended=True)
- self.assertEqual(
- [major, minor, None, None, None, None, len(body), None, None,
- None, None],
- structure)
-
-
- def test_textPart(self):
- """
- For a I{text/*} message, the number of lines in the message body are
- included after the common single-part basic fields.
- """
- body = 'hello, world\nhow are you?\ngoodbye\n'
- major = 'text'
- minor = 'jpeg'
- charset = 'us-ascii'
- identifier = 'some kind of id'
- description = 'great justice'
- encoding = 'maximum'
- msg = FakeyMessage({
- 'content-type': '%s/%s; charset=%s; x=y' % (
- major, minor, charset),
- 'content-id': identifier,
- 'content-description': description,
- 'content-transfer-encoding': encoding,
- }, (), '', body, 123, None)
- structure = imap4.getBodyStructure(msg)
- self.assertEqual(
- [major, minor, ["charset", charset, 'x', 'y'], identifier,
- description, encoding, len(body), len(body.splitlines())],
- structure)
-
-
- def test_rfc822Message(self):
- """
- For a I{message/rfc822} message, the common basic fields are followed
- by information about the contained message.
- """
- body = 'hello, world\nhow are you?\ngoodbye\n'
- major = 'text'
- minor = 'jpeg'
- charset = 'us-ascii'
- identifier = 'some kind of id'
- description = 'great justice'
- encoding = 'maximum'
- msg = FakeyMessage({
- 'content-type': '%s/%s; charset=%s; x=y' % (
- major, minor, charset),
- 'from': 'Alice <alice@example.com>',
- 'to': 'Bob <bob@example.com>',
- 'content-id': identifier,
- 'content-description': description,
- 'content-transfer-encoding': encoding,
- }, (), '', body, 123, None)
-
- container = FakeyMessage({
- 'content-type': 'message/rfc822',
- }, (), '', '', 123, [msg])
-
- structure = imap4.getBodyStructure(container)
- self.assertEqual(
- ['message', 'rfc822', None, None, None, None, 0,
- imap4.getEnvelope(msg), imap4.getBodyStructure(msg), 3],
- structure)
-
-
- def test_multiPart(self):
- """
- For a I{multipart/*} message, L{imap4.getBodyStructure} returns a list
- containing the body structure information for each part of the message
- followed by an element giving the MIME subtype of the message.
- """
- oneSubPart = FakeyMessage({
- 'content-type': 'image/jpeg; x=y',
- 'content-id': 'some kind of id',
- 'content-description': 'great justice',
- 'content-transfer-encoding': 'maximum',
- }, (), '', 'hello world', 123, None)
-
- anotherSubPart = FakeyMessage({
- 'content-type': 'text/plain; charset=us-ascii',
- }, (), '', 'some stuff', 321, None)
-
- container = FakeyMessage({
- 'content-type': 'multipart/related',
- }, (), '', '', 555, [oneSubPart, anotherSubPart])
-
- self.assertEqual(
- [imap4.getBodyStructure(oneSubPart),
- imap4.getBodyStructure(anotherSubPart),
- 'related'],
- imap4.getBodyStructure(container))
-
-
- def test_multiPartExtended(self):
- """
- When passed a I{multipart/*} message and C{True} for the C{extended}
- argument, L{imap4.getBodyStructure} includes extended structure
- information from the parts of the multipart message and extended
- structure information about the multipart message itself.
- """
- oneSubPart = FakeyMessage({
- 'content-type': 'image/jpeg; x=y',
- 'content-id': 'some kind of id',
- 'content-description': 'great justice',
- 'content-transfer-encoding': 'maximum',
- }, (), '', 'hello world', 123, None)
-
- anotherSubPart = FakeyMessage({
- 'content-type': 'text/plain; charset=us-ascii',
- }, (), '', 'some stuff', 321, None)
-
- container = FakeyMessage({
- 'content-type': 'multipart/related; foo=bar',
- 'content-language': 'es',
- 'content-location': 'Spain',
- 'content-disposition': 'attachment; name=monkeys',
- }, (), '', '', 555, [oneSubPart, anotherSubPart])
-
- self.assertEqual(
- [imap4.getBodyStructure(oneSubPart, extended=True),
- imap4.getBodyStructure(anotherSubPart, extended=True),
- 'related', ['foo', 'bar'], ['attachment', ['name', 'monkeys']],
- 'es', 'Spain'],
- imap4.getBodyStructure(container, extended=True))
-
-
-
-class NewFetchTestCase(unittest.TestCase, IMAP4HelperMixin):
- def setUp(self):
- self.received_messages = self.received_uid = None
- self.result = None
-
- self.server = imap4.IMAP4Server()
- self.server.state = 'select'
- self.server.mbox = self
- self.connected = defer.Deferred()
- self.client = SimpleClient(self.connected)
-
- def addListener(self, x):
- pass
- def removeListener(self, x):
- pass
-
- def fetch(self, messages, uid):
- self.received_messages = messages
- self.received_uid = uid
- return iter(zip(range(len(self.msgObjs)), self.msgObjs))
-
- def _fetchWork(self, uid):
- if uid:
- for (i, msg) in zip(range(len(self.msgObjs)), self.msgObjs):
- self.expected[i]['UID'] = str(msg.getUID())
-
- def result(R):
- self.result = R
-
- self.connected.addCallback(lambda _: self.function(self.messages, uid)
- ).addCallback(result
- ).addCallback(self._cbStopClient
- ).addErrback(self._ebGeneral)
-
- d = loopback.loopbackTCP(self.server, self.client, noisy=False)
- d.addCallback(lambda x : self.assertEqual(self.result, self.expected))
- return d
-
- def testFetchUID(self):
- self.function = lambda m, u: self.client.fetchUID(m)
-
- self.messages = '7'
- self.msgObjs = [
- FakeyMessage({}, (), '', '', 12345, None),
- FakeyMessage({}, (), '', '', 999, None),
- FakeyMessage({}, (), '', '', 10101, None),
- ]
- self.expected = {
- 0: {'UID': '12345'},
- 1: {'UID': '999'},
- 2: {'UID': '10101'},
- }
- return self._fetchWork(0)
-
- def testFetchFlags(self, uid=0):
- self.function = self.client.fetchFlags
- self.messages = '9'
- self.msgObjs = [
- FakeyMessage({}, ['FlagA', 'FlagB', '\\FlagC'], '', '', 54321, None),
- FakeyMessage({}, ['\\FlagC', 'FlagA', 'FlagB'], '', '', 12345, None),
- ]
- self.expected = {
- 0: {'FLAGS': ['FlagA', 'FlagB', '\\FlagC']},
- 1: {'FLAGS': ['\\FlagC', 'FlagA', 'FlagB']},
- }
- return self._fetchWork(uid)
-
- def testFetchFlagsUID(self):
- return self.testFetchFlags(1)
-
- def testFetchInternalDate(self, uid=0):
- self.function = self.client.fetchInternalDate
- self.messages = '13'
- self.msgObjs = [
- FakeyMessage({}, (), 'Fri, 02 Nov 2003 21:25:10 GMT', '', 23232, None),
- FakeyMessage({}, (), 'Thu, 29 Dec 2013 11:31:52 EST', '', 101, None),
- FakeyMessage({}, (), 'Mon, 10 Mar 1992 02:44:30 CST', '', 202, None),
- FakeyMessage({}, (), 'Sat, 11 Jan 2000 14:40:24 PST', '', 303, None),
- ]
- self.expected = {
- 0: {'INTERNALDATE': '02-Nov-2003 21:25:10 +0000'},
- 1: {'INTERNALDATE': '29-Dec-2013 11:31:52 -0500'},
- 2: {'INTERNALDATE': '10-Mar-1992 02:44:30 -0600'},
- 3: {'INTERNALDATE': '11-Jan-2000 14:40:24 -0800'},
- }
- return self._fetchWork(uid)
-
- def testFetchInternalDateUID(self):
- return self.testFetchInternalDate(1)
-
-
- def test_fetchInternalDateLocaleIndependent(self):
- """
- The month name in the date is locale independent.
- """
- # Fake that we're in a language where December is not Dec
- currentLocale = locale.setlocale(locale.LC_ALL, None)
- locale.setlocale(locale.LC_ALL, "es_AR.UTF8")
- self.addCleanup(locale.setlocale, locale.LC_ALL, currentLocale)
- return self.testFetchInternalDate(1)
-
- # if alternate locale is not available, the previous test will be skipped,
- # please install this locale for it to run. Avoid using locale.getlocale to
- # learn the current locale; its values don't round-trip well on all
- # platforms. Fortunately setlocale returns a value which does round-trip
- # well.
- currentLocale = locale.setlocale(locale.LC_ALL, None)
- try:
- locale.setlocale(locale.LC_ALL, "es_AR.UTF8")
- except locale.Error:
- test_fetchInternalDateLocaleIndependent.skip = (
- "The es_AR.UTF8 locale is not installed.")
- else:
- locale.setlocale(locale.LC_ALL, currentLocale)
-
-
- def testFetchEnvelope(self, uid=0):
- self.function = self.client.fetchEnvelope
- self.messages = '15'
- self.msgObjs = [
- FakeyMessage({
- 'from': 'user@domain', 'to': 'resu@domain',
- 'date': 'thursday', 'subject': 'it is a message',
- 'message-id': 'id-id-id-yayaya'}, (), '', '', 65656,
- None),
- ]
- self.expected = {
- 0: {'ENVELOPE':
- ['thursday', 'it is a message',
- [[None, None, 'user', 'domain']],
- [[None, None, 'user', 'domain']],
- [[None, None, 'user', 'domain']],
- [[None, None, 'resu', 'domain']],
- None, None, None, 'id-id-id-yayaya']
- }
- }
- return self._fetchWork(uid)
-
- def testFetchEnvelopeUID(self):
- return self.testFetchEnvelope(1)
-
-
- def test_fetchBodyStructure(self, uid=0):
- """
- L{IMAP4Client.fetchBodyStructure} issues a I{FETCH BODYSTRUCTURE}
- command and returns a Deferred which fires with a structure giving the
- result of parsing the server's response. The structure is a list
- reflecting the parenthesized data sent by the server, as described by
- RFC 3501, section 7.4.2.
- """
- self.function = self.client.fetchBodyStructure
- self.messages = '3:9,10:*'
- self.msgObjs = [FakeyMessage({
- 'content-type': 'text/plain; name=thing; key="value"',
- 'content-id': 'this-is-the-content-id',
- 'content-description': 'describing-the-content-goes-here!',
- 'content-transfer-encoding': '8BIT',
- 'content-md5': 'abcdef123456',
- 'content-disposition': 'attachment; filename=monkeys',
- 'content-language': 'es',
- 'content-location': 'http://example.com/monkeys',
- }, (), '', 'Body\nText\nGoes\nHere\n', 919293, None)]
- self.expected = {0: {'BODYSTRUCTURE': [
- 'text', 'plain', ['key', 'value', 'name', 'thing'],
- 'this-is-the-content-id', 'describing-the-content-goes-here!',
- '8BIT', '20', '4', 'abcdef123456',
- ['attachment', ['filename', 'monkeys']], 'es',
- 'http://example.com/monkeys']}}
- return self._fetchWork(uid)
-
-
- def testFetchBodyStructureUID(self):
- """
- If passed C{True} for the C{uid} argument, C{fetchBodyStructure} can
- also issue a I{UID FETCH BODYSTRUCTURE} command.
- """
- return self.test_fetchBodyStructure(1)
-
-
- def test_fetchBodyStructureMultipart(self, uid=0):
- """
- L{IMAP4Client.fetchBodyStructure} can also parse the response to a
- I{FETCH BODYSTRUCTURE} command for a multipart message.
- """
- self.function = self.client.fetchBodyStructure
- self.messages = '3:9,10:*'
- innerMessage = FakeyMessage({
- 'content-type': 'text/plain; name=thing; key="value"',
- 'content-id': 'this-is-the-content-id',
- 'content-description': 'describing-the-content-goes-here!',
- 'content-transfer-encoding': '8BIT',
- 'content-language': 'fr',
- 'content-md5': '123456abcdef',
- 'content-disposition': 'inline',
- 'content-location': 'outer space',
- }, (), '', 'Body\nText\nGoes\nHere\n', 919293, None)
- self.msgObjs = [FakeyMessage({
- 'content-type': 'multipart/mixed; boundary="xyz"',
- 'content-language': 'en',
- 'content-location': 'nearby',
- }, (), '', '', 919293, [innerMessage])]
- self.expected = {0: {'BODYSTRUCTURE': [
- ['text', 'plain', ['key', 'value', 'name', 'thing'],
- 'this-is-the-content-id', 'describing-the-content-goes-here!',
- '8BIT', '20', '4', '123456abcdef', ['inline', None], 'fr',
- 'outer space'],
- 'mixed', ['boundary', 'xyz'], None, 'en', 'nearby'
- ]}}
- return self._fetchWork(uid)
-
-
- def testFetchSimplifiedBody(self, uid=0):
- self.function = self.client.fetchSimplifiedBody
- self.messages = '21'
- self.msgObjs = [FakeyMessage({}, (), '', 'Yea whatever', 91825,
- [FakeyMessage({'content-type': 'image/jpg'}, (), '',
- 'Body Body Body', None, None
- )]
- )]
- self.expected = {0:
- {'BODY':
- [None, None, None, None, None, None,
- '12'
- ]
- }
- }
-
- return self._fetchWork(uid)
-
- def testFetchSimplifiedBodyUID(self):
- return self.testFetchSimplifiedBody(1)
-
- def testFetchSimplifiedBodyText(self, uid=0):
- self.function = self.client.fetchSimplifiedBody
- self.messages = '21'
- self.msgObjs = [FakeyMessage({'content-type': 'text/plain'},
- (), '', 'Yea whatever', 91825, None)]
- self.expected = {0:
- {'BODY':
- ['text', 'plain', None, None, None, None,
- '12', '1'
- ]
- }
- }
-
- return self._fetchWork(uid)
-
- def testFetchSimplifiedBodyTextUID(self):
- return self.testFetchSimplifiedBodyText(1)
-
- def testFetchSimplifiedBodyRFC822(self, uid=0):
- self.function = self.client.fetchSimplifiedBody
- self.messages = '21'
- self.msgObjs = [FakeyMessage({'content-type': 'message/rfc822'},
- (), '', 'Yea whatever', 91825,
- [FakeyMessage({'content-type': 'image/jpg'}, (), '',
- 'Body Body Body', None, None
- )]
- )]
- self.expected = {0:
- {'BODY':
- ['message', 'rfc822', None, None, None, None,
- '12', [None, None, [[None, None, None]],
- [[None, None, None]], None, None, None,
- None, None, None], ['image', 'jpg', None,
- None, None, None, '14'], '1'
- ]
- }
- }
-
- return self._fetchWork(uid)
-
- def testFetchSimplifiedBodyRFC822UID(self):
- return self.testFetchSimplifiedBodyRFC822(1)
-
- def testFetchMessage(self, uid=0):
- self.function = self.client.fetchMessage
- self.messages = '1,3,7,10101'
- self.msgObjs = [
- FakeyMessage({'Header': 'Value'}, (), '', 'BODY TEXT\r\n', 91, None),
- ]
- self.expected = {
- 0: {'RFC822': 'Header: Value\r\n\r\nBODY TEXT\r\n'}
- }
- return self._fetchWork(uid)
-
- def testFetchMessageUID(self):
- return self.testFetchMessage(1)
-
- def testFetchHeaders(self, uid=0):
- self.function = self.client.fetchHeaders
- self.messages = '9,6,2'
- self.msgObjs = [
- FakeyMessage({'H1': 'V1', 'H2': 'V2'}, (), '', '', 99, None),
- ]
- self.expected = {
- 0: {'RFC822.HEADER': imap4._formatHeaders({'H1': 'V1', 'H2': 'V2'})},
- }
- return self._fetchWork(uid)
-
- def testFetchHeadersUID(self):
- return self.testFetchHeaders(1)
-
- def testFetchBody(self, uid=0):
- self.function = self.client.fetchBody
- self.messages = '1,2,3,4,5,6,7'
- self.msgObjs = [
- FakeyMessage({'Header': 'Value'}, (), '', 'Body goes here\r\n', 171, None),
- ]
- self.expected = {
- 0: {'RFC822.TEXT': 'Body goes here\r\n'},
- }
- return self._fetchWork(uid)
-
- def testFetchBodyUID(self):
- return self.testFetchBody(1)
-
- def testFetchBodyParts(self):
- """
- Test the server's handling of requests for specific body sections.
- """
- self.function = self.client.fetchSpecific
- self.messages = '1'
- outerBody = ''
- innerBody1 = 'Contained body message text. Squarge.'
- innerBody2 = 'Secondary <i>message</i> text of squarge body.'
- headers = util.OrderedDict()
- headers['from'] = 'sender@host'
- headers['to'] = 'recipient@domain'
- headers['subject'] = 'booga booga boo'
- headers['content-type'] = 'multipart/alternative; boundary="xyz"'
- innerHeaders = util.OrderedDict()
- innerHeaders['subject'] = 'this is subject text'
- innerHeaders['content-type'] = 'text/plain'
- innerHeaders2 = util.OrderedDict()
- innerHeaders2['subject'] = '<b>this is subject</b>'
- innerHeaders2['content-type'] = 'text/html'
- self.msgObjs = [FakeyMessage(
- headers, (), None, outerBody, 123,
- [FakeyMessage(innerHeaders, (), None, innerBody1, None, None),
- FakeyMessage(innerHeaders2, (), None, innerBody2, None, None)])]
- self.expected = {
- 0: [['BODY', ['1'], 'Contained body message text. Squarge.']]}
-
- def result(R):
- self.result = R
-
- self.connected.addCallback(
- lambda _: self.function(self.messages, headerNumber=1))
- self.connected.addCallback(result)
- self.connected.addCallback(self._cbStopClient)
- self.connected.addErrback(self._ebGeneral)
-
- d = loopback.loopbackTCP(self.server, self.client, noisy=False)
- d.addCallback(lambda ign: self.assertEqual(self.result, self.expected))
- return d
-
-
- def test_fetchBodyPartOfNonMultipart(self):
- """
- Single-part messages have an implicit first part which clients
- should be able to retrieve explicitly. Test that a client
- requesting part 1 of a text/plain message receives the body of the
- text/plain part.
- """
- self.function = self.client.fetchSpecific
- self.messages = '1'
- parts = [1]
- outerBody = 'DA body'
- headers = util.OrderedDict()
- headers['from'] = 'sender@host'
- headers['to'] = 'recipient@domain'
- headers['subject'] = 'booga booga boo'
- headers['content-type'] = 'text/plain'
- self.msgObjs = [FakeyMessage(
- headers, (), None, outerBody, 123, None)]
-
- self.expected = {0: [['BODY', ['1'], 'DA body']]}
-
- def result(R):
- self.result = R
-
- self.connected.addCallback(
- lambda _: self.function(self.messages, headerNumber=parts))
- self.connected.addCallback(result)
- self.connected.addCallback(self._cbStopClient)
- self.connected.addErrback(self._ebGeneral)
-
- d = loopback.loopbackTCP(self.server, self.client, noisy=False)
- d.addCallback(lambda ign: self.assertEqual(self.result, self.expected))
- return d
-
-
- def testFetchSize(self, uid=0):
- self.function = self.client.fetchSize
- self.messages = '1:100,2:*'
- self.msgObjs = [
- FakeyMessage({}, (), '', 'x' * 20, 123, None),
- ]
- self.expected = {
- 0: {'RFC822.SIZE': '20'},
- }
- return self._fetchWork(uid)
-
- def testFetchSizeUID(self):
- return self.testFetchSize(1)
-
- def testFetchFull(self, uid=0):
- self.function = self.client.fetchFull
- self.messages = '1,3'
- self.msgObjs = [
- FakeyMessage({}, ('\\XYZ', '\\YZX', 'Abc'),
- 'Sun, 25 Jul 2010 06:20:30 -0400 (EDT)',
- 'xyz' * 2, 654, None),
- FakeyMessage({}, ('\\One', '\\Two', 'Three'),
- 'Mon, 14 Apr 2003 19:43:44 -0400',
- 'abc' * 4, 555, None),
- ]
- self.expected = {
- 0: {'FLAGS': ['\\XYZ', '\\YZX', 'Abc'],
- 'INTERNALDATE': '25-Jul-2010 06:20:30 -0400',
- 'RFC822.SIZE': '6',
- 'ENVELOPE': [None, None, [[None, None, None]], [[None, None, None]], None, None, None, None, None, None],
- 'BODY': [None, None, None, None, None, None, '6']},
- 1: {'FLAGS': ['\\One', '\\Two', 'Three'],
- 'INTERNALDATE': '14-Apr-2003 19:43:44 -0400',
- 'RFC822.SIZE': '12',
- 'ENVELOPE': [None, None, [[None, None, None]], [[None, None, None]], None, None, None, None, None, None],
- 'BODY': [None, None, None, None, None, None, '12']},
- }
- return self._fetchWork(uid)
-
- def testFetchFullUID(self):
- return self.testFetchFull(1)
-
- def testFetchAll(self, uid=0):
- self.function = self.client.fetchAll
- self.messages = '1,2:3'
- self.msgObjs = [
- FakeyMessage({}, (), 'Mon, 14 Apr 2003 19:43:44 +0400',
- 'Lalala', 10101, None),
- FakeyMessage({}, (), 'Tue, 15 Apr 2003 19:43:44 +0200',
- 'Alalal', 20202, None),
- ]
- self.expected = {
- 0: {'ENVELOPE': [None, None, [[None, None, None]], [[None, None, None]], None, None, None, None, None, None],
- 'RFC822.SIZE': '6',
- 'INTERNALDATE': '14-Apr-2003 19:43:44 +0400',
- 'FLAGS': []},
- 1: {'ENVELOPE': [None, None, [[None, None, None]], [[None, None, None]], None, None, None, None, None, None],
- 'RFC822.SIZE': '6',
- 'INTERNALDATE': '15-Apr-2003 19:43:44 +0200',
- 'FLAGS': []},
- }
- return self._fetchWork(uid)
-
- def testFetchAllUID(self):
- return self.testFetchAll(1)
-
- def testFetchFast(self, uid=0):
- self.function = self.client.fetchFast
- self.messages = '1'
- self.msgObjs = [
- FakeyMessage({}, ('\\X',), '19 Mar 2003 19:22:21 -0500', '', 9, None),
- ]
- self.expected = {
- 0: {'FLAGS': ['\\X'],
- 'INTERNALDATE': '19-Mar-2003 19:22:21 -0500',
- 'RFC822.SIZE': '0'},
- }
- return self._fetchWork(uid)
-
- def testFetchFastUID(self):
- return self.testFetchFast(1)
-
-
-
-class DefaultSearchTestCase(IMAP4HelperMixin, unittest.TestCase):
- """
- Test the behavior of the server's SEARCH implementation, particularly in
- the face of unhandled search terms.
- """
- def setUp(self):
- self.server = imap4.IMAP4Server()
- self.server.state = 'select'
- self.server.mbox = self
- self.connected = defer.Deferred()
- self.client = SimpleClient(self.connected)
- self.msgObjs = [
- FakeyMessage({}, (), '', '', 999, None),
- FakeyMessage({}, (), '', '', 10101, None),
- FakeyMessage({}, (), '', '', 12345, None),
- FakeyMessage({}, (), '', '', 20001, None),
- FakeyMessage({}, (), '', '', 20002, None),
- ]
-
-
- def fetch(self, messages, uid):
- """
- Pretend to be a mailbox and let C{self.server} lookup messages on me.
- """
- return zip(range(1, len(self.msgObjs) + 1), self.msgObjs)
-
-
- def _messageSetSearchTest(self, queryTerms, expectedMessages):
- """
- Issue a search with given query and verify that the returned messages
- match the given expected messages.
-
- @param queryTerms: A string giving the search query.
- @param expectedMessages: A list of the message sequence numbers
- expected as the result of the search.
- @return: A L{Deferred} which fires when the test is complete.
- """
- def search():
- return self.client.search(queryTerms)
-
- d = self.connected.addCallback(strip(search))
- def searched(results):
- self.assertEqual(results, expectedMessages)
- d.addCallback(searched)
- d.addCallback(self._cbStopClient)
- d.addErrback(self._ebGeneral)
- self.loopback()
- return d
-
-
- def test_searchMessageSet(self):
- """
- Test that a search which starts with a message set properly limits
- the search results to messages in that set.
- """
- return self._messageSetSearchTest('1', [1])
-
-
- def test_searchMessageSetWithStar(self):
- """
- If the search filter ends with a star, all the message from the
- starting point are returned.
- """
- return self._messageSetSearchTest('2:*', [2, 3, 4, 5])
-
-
- def test_searchMessageSetWithStarFirst(self):
- """
- If the search filter starts with a star, the result should be identical
- with if the filter would end with a star.
- """
- return self._messageSetSearchTest('*:2', [2, 3, 4, 5])
-
-
- def test_searchMessageSetUIDWithStar(self):
- """
- If the search filter ends with a star, all the message from the
- starting point are returned (also for the SEARCH UID case).
- """
- return self._messageSetSearchTest('UID 10000:*', [2, 3, 4, 5])
-
-
- def test_searchMessageSetUIDWithStarFirst(self):
- """
- If the search filter starts with a star, the result should be identical
- with if the filter would end with a star (also for the SEARCH UID case).
- """
- return self._messageSetSearchTest('UID *:10000', [2, 3, 4, 5])
-
-
- def test_searchMessageSetUIDWithStarAndHighStart(self):
- """
- A search filter of 1234:* should include the UID of the last message in
- the mailbox, even if its UID is less than 1234.
- """
- # in our fake mbox the highest message UID is 20002
- return self._messageSetSearchTest('UID 30000:*', [5])
-
-
- def test_searchMessageSetWithList(self):
- """
- If the search filter contains nesting terms, one of which includes a
- message sequence set with a wildcard, IT ALL WORKS GOOD.
- """
- # 6 is bigger than the biggest message sequence number, but that's
- # okay, because N:* includes the biggest message sequence number even
- # if N is bigger than that (read the rfc nub).
- return self._messageSetSearchTest('(6:*)', [5])
-
-
- def test_searchOr(self):
- """
- If the search filter contains an I{OR} term, all messages
- which match either subexpression are returned.
- """
- return self._messageSetSearchTest('OR 1 2', [1, 2])
-
-
- def test_searchOrMessageSet(self):
- """
- If the search filter contains an I{OR} term with a
- subexpression which includes a message sequence set wildcard,
- all messages in that set are considered for inclusion in the
- results.
- """
- return self._messageSetSearchTest('OR 2:* 2:*', [2, 3, 4, 5])
-
-
- def test_searchNot(self):
- """
- If the search filter contains a I{NOT} term, all messages
- which do not match the subexpression are returned.
- """
- return self._messageSetSearchTest('NOT 3', [1, 2, 4, 5])
-
-
- def test_searchNotMessageSet(self):
- """
- If the search filter contains a I{NOT} term with a
- subexpression which includes a message sequence set wildcard,
- no messages in that set are considered for inclusion in the
- result.
- """
- return self._messageSetSearchTest('NOT 2:*', [1])
-
-
- def test_searchAndMessageSet(self):
- """
- If the search filter contains multiple terms implicitly
- conjoined with a message sequence set wildcard, only the
- intersection of the results of each term are returned.
- """
- return self._messageSetSearchTest('2:* 3', [3])
-
- def test_searchInvalidCriteria(self):
- """
- If the search criteria is not a valid key, a NO result is returned to
- the client (resulting in an error callback), and an IllegalQueryError is
- logged on the server side.
- """
- queryTerms = 'FOO'
- def search():
- return self.client.search(queryTerms)
-
- d = self.connected.addCallback(strip(search))
- d = self.assertFailure(d, imap4.IMAP4Exception)
-
- def errorReceived(results):
- """
- Verify that the server logs an IllegalQueryError and the
- client raises an IMAP4Exception with 'Search failed:...'
- """
- self.client.transport.loseConnection()
- self.server.transport.loseConnection()
-
- # Check what the server logs
- errors = self.flushLoggedErrors(imap4.IllegalQueryError)
- self.assertEqual(len(errors), 1)
-
- # Verify exception given to client has the correct message
- self.assertEqual(
- "SEARCH failed: Invalid search command FOO", str(results))
-
- d.addCallback(errorReceived)
- d.addErrback(self._ebGeneral)
- self.loopback()
- return d
-
-
-
-class FetchSearchStoreTestCase(unittest.TestCase, IMAP4HelperMixin):
- implements(imap4.ISearchableMailbox)
-
- def setUp(self):
- self.expected = self.result = None
- self.server_received_query = None
- self.server_received_uid = None
- self.server_received_parts = None
- self.server_received_messages = None
-
- self.server = imap4.IMAP4Server()
- self.server.state = 'select'
- self.server.mbox = self
- self.connected = defer.Deferred()
- self.client = SimpleClient(self.connected)
-
- def search(self, query, uid):
- # Look for a specific bad query, so we can verify we handle it properly
- if query == ['FOO']:
- raise imap4.IllegalQueryError("FOO is not a valid search criteria")
-
- self.server_received_query = query
- self.server_received_uid = uid
- return self.expected
-
- def addListener(self, *a, **kw):
- pass
- removeListener = addListener
-
- def _searchWork(self, uid):
- def search():
- return self.client.search(self.query, uid=uid)
- def result(R):
- self.result = R
-
- self.connected.addCallback(strip(search)
- ).addCallback(result
- ).addCallback(self._cbStopClient
- ).addErrback(self._ebGeneral)
-
- def check(ignored):
- # Ensure no short-circuiting wierdness is going on
- self.failIf(self.result is self.expected)
-
- self.assertEqual(self.result, self.expected)
- self.assertEqual(self.uid, self.server_received_uid)
- self.assertEqual(
- imap4.parseNestedParens(self.query),
- self.server_received_query
- )
- d = loopback.loopbackTCP(self.server, self.client, noisy=False)
- d.addCallback(check)
- return d
-
- def testSearch(self):
- self.query = imap4.Or(
- imap4.Query(header=('subject', 'substring')),
- imap4.Query(larger=1024, smaller=4096),
- )
- self.expected = [1, 4, 5, 7]
- self.uid = 0
- return self._searchWork(0)
-
- def testUIDSearch(self):
- self.query = imap4.Or(
- imap4.Query(header=('subject', 'substring')),
- imap4.Query(larger=1024, smaller=4096),
- )
- self.uid = 1
- self.expected = [1, 2, 3]
- return self._searchWork(1)
-
- def getUID(self, msg):
- try:
- return self.expected[msg]['UID']
- except (TypeError, IndexError):
- return self.expected[msg-1]
- except KeyError:
- return 42
-
- def fetch(self, messages, uid):
- self.server_received_uid = uid
- self.server_received_messages = str(messages)
- return self.expected
-
- def _fetchWork(self, fetch):
- def result(R):
- self.result = R
-
- self.connected.addCallback(strip(fetch)
- ).addCallback(result
- ).addCallback(self._cbStopClient
- ).addErrback(self._ebGeneral)
-
- def check(ignored):
- # Ensure no short-circuiting wierdness is going on
- self.failIf(self.result is self.expected)
-
- self.parts and self.parts.sort()
- self.server_received_parts and self.server_received_parts.sort()
-
- if self.uid:
- for (k, v) in self.expected.items():
- v['UID'] = str(k)
-
- self.assertEqual(self.result, self.expected)
- self.assertEqual(self.uid, self.server_received_uid)
- self.assertEqual(self.parts, self.server_received_parts)
- self.assertEqual(imap4.parseIdList(self.messages),
- imap4.parseIdList(self.server_received_messages))
-
- d = loopback.loopbackTCP(self.server, self.client, noisy=False)
- d.addCallback(check)
- return d
-
-
- def test_invalidTerm(self):
- """
- If, as part of a search, an ISearchableMailbox raises an
- IllegalQueryError (e.g. due to invalid search criteria), client sees a
- failure response, and an IllegalQueryError is logged on the server.
- """
- query = 'FOO'
-
- def search():
- return self.client.search(query)
-
- d = self.connected.addCallback(strip(search))
- d = self.assertFailure(d, imap4.IMAP4Exception)
-
- def errorReceived(results):
- """
- Verify that the server logs an IllegalQueryError and the
- client raises an IMAP4Exception with 'Search failed:...'
- """
- self.client.transport.loseConnection()
- self.server.transport.loseConnection()
-
- # Check what the server logs
- errors = self.flushLoggedErrors(imap4.IllegalQueryError)
- self.assertEqual(len(errors), 1)
-
- # Verify exception given to client has the correct message
- self.assertEqual(
- "SEARCH failed: FOO is not a valid search criteria",
- str(results))
-
- d.addCallback(errorReceived)
- d.addErrback(self._ebGeneral)
- self.loopback()
- return d
-
-
-
-class FakeMailbox:
- def __init__(self):
- self.args = []
- def addMessage(self, body, flags, date):
- self.args.append((body, flags, date))
- return defer.succeed(None)
-
-class FeaturefulMessage:
- implements(imap4.IMessageFile)
-
- def getFlags(self):
- return 'flags'
-
- def getInternalDate(self):
- return 'internaldate'
-
- def open(self):
- return StringIO("open")
-
-class MessageCopierMailbox:
- implements(imap4.IMessageCopier)
-
- def __init__(self):
- self.msgs = []
-
- def copy(self, msg):
- self.msgs.append(msg)
- return len(self.msgs)
-
-class CopyWorkerTestCase(unittest.TestCase):
- def testFeaturefulMessage(self):
- s = imap4.IMAP4Server()
-
- # Yes. I am grabbing this uber-non-public method to test it.
- # It is complex. It needs to be tested directly!
- # Perhaps it should be refactored, simplified, or split up into
- # not-so-private components, but that is a task for another day.
-
- # Ha ha! Addendum! Soon it will be split up, and this test will
- # be re-written to just use the default adapter for IMailbox to
- # IMessageCopier and call .copy on that adapter.
- f = s._IMAP4Server__cbCopy
-
- m = FakeMailbox()
- d = f([(i, FeaturefulMessage()) for i in range(1, 11)], 'tag', m)
-
- def cbCopy(results):
- for a in m.args:
- self.assertEqual(a[0].read(), "open")
- self.assertEqual(a[1], "flags")
- self.assertEqual(a[2], "internaldate")
-
- for (status, result) in results:
- self.failUnless(status)
- self.assertEqual(result, None)
-
- return d.addCallback(cbCopy)
-
-
- def testUnfeaturefulMessage(self):
- s = imap4.IMAP4Server()
-
- # See above comment
- f = s._IMAP4Server__cbCopy
-
- m = FakeMailbox()
- msgs = [FakeyMessage({'Header-Counter': str(i)}, (), 'Date', 'Body %d' % (i,), i + 10, None) for i in range(1, 11)]
- d = f([im for im in zip(range(1, 11), msgs)], 'tag', m)
-
- def cbCopy(results):
- seen = []
- for a in m.args:
- seen.append(a[0].read())
- self.assertEqual(a[1], ())
- self.assertEqual(a[2], "Date")
-
- seen.sort()
- exp = ["Header-Counter: %d\r\n\r\nBody %d" % (i, i) for i in range(1, 11)]
- exp.sort()
- self.assertEqual(seen, exp)
-
- for (status, result) in results:
- self.failUnless(status)
- self.assertEqual(result, None)
-
- return d.addCallback(cbCopy)
-
- def testMessageCopier(self):
- s = imap4.IMAP4Server()
-
- # See above comment
- f = s._IMAP4Server__cbCopy
-
- m = MessageCopierMailbox()
- msgs = [object() for i in range(1, 11)]
- d = f([im for im in zip(range(1, 11), msgs)], 'tag', m)
-
- def cbCopy(results):
- self.assertEqual(results, zip([1] * 10, range(1, 11)))
- for (orig, new) in zip(msgs, m.msgs):
- self.assertIdentical(orig, new)
-
- return d.addCallback(cbCopy)
-
-
-class TLSTestCase(IMAP4HelperMixin, unittest.TestCase):
- serverCTX = ServerTLSContext and ServerTLSContext()
- clientCTX = ClientTLSContext and ClientTLSContext()
-
- def loopback(self):
- return loopback.loopbackTCP(self.server, self.client, noisy=False)
-
- def testAPileOfThings(self):
- SimpleServer.theAccount.addMailbox('inbox')
- called = []
- def login():
- called.append(None)
- return self.client.login('testuser', 'password-test')
- def list():
- called.append(None)
- return self.client.list('inbox', '%')
- def status():
- called.append(None)
- return self.client.status('inbox', 'UIDNEXT')
- def examine():
- called.append(None)
- return self.client.examine('inbox')
- def logout():
- called.append(None)
- return self.client.logout()
-
- self.client.requireTransportSecurity = True
-
- methods = [login, list, status, examine, logout]
- map(self.connected.addCallback, map(strip, methods))
- self.connected.addCallbacks(self._cbStopClient, self._ebGeneral)
- def check(ignored):
- self.assertEqual(self.server.startedTLS, True)
- self.assertEqual(self.client.startedTLS, True)
- self.assertEqual(len(called), len(methods))
- d = self.loopback()
- d.addCallback(check)
- return d
-
- def testLoginLogin(self):
- self.server.checker.addUser('testuser', 'password-test')
- success = []
- self.client.registerAuthenticator(imap4.LOGINAuthenticator('testuser'))
- self.connected.addCallback(
- lambda _: self.client.authenticate('password-test')
- ).addCallback(
- lambda _: self.client.logout()
- ).addCallback(success.append
- ).addCallback(self._cbStopClient
- ).addErrback(self._ebGeneral)
-
- d = self.loopback()
- d.addCallback(lambda x : self.assertEqual(len(success), 1))
- return d
-
-
- def test_startTLS(self):
- """
- L{IMAP4Client.startTLS} triggers TLS negotiation and returns a
- L{Deferred} which fires after the client's transport is using
- encryption.
- """
- success = []
- self.connected.addCallback(lambda _: self.client.startTLS())
- def checkSecure(ignored):
- self.assertTrue(
- interfaces.ISSLTransport.providedBy(self.client.transport))
- self.connected.addCallback(checkSecure)
- self.connected.addCallback(self._cbStopClient)
- self.connected.addCallback(success.append)
- self.connected.addErrback(self._ebGeneral)
-
- d = self.loopback()
- d.addCallback(lambda x : self.failUnless(success))
- return defer.gatherResults([d, self.connected])
-
-
- def testFailedStartTLS(self):
- failure = []
- def breakServerTLS(ign):
- self.server.canStartTLS = False
-
- self.connected.addCallback(breakServerTLS)
- self.connected.addCallback(lambda ign: self.client.startTLS())
- self.connected.addErrback(lambda err: failure.append(err.trap(imap4.IMAP4Exception)))
- self.connected.addCallback(self._cbStopClient)
- self.connected.addErrback(self._ebGeneral)
-
- def check(ignored):
- self.failUnless(failure)
- self.assertIdentical(failure[0], imap4.IMAP4Exception)
- return self.loopback().addCallback(check)
-
-
-
-class SlowMailbox(SimpleMailbox):
- howSlow = 2
- callLater = None
- fetchDeferred = None
-
- # Not a very nice implementation of fetch(), but it'll
- # do for the purposes of testing.
- def fetch(self, messages, uid):
- d = defer.Deferred()
- self.callLater(self.howSlow, d.callback, ())
- self.fetchDeferred.callback(None)
- return d
-
-class Timeout(IMAP4HelperMixin, unittest.TestCase):
-
- def test_serverTimeout(self):
- """
- The *client* has a timeout mechanism which will close connections that
- are inactive for a period.
- """
- c = Clock()
- self.server.timeoutTest = True
- self.client.timeout = 5 #seconds
- self.client.callLater = c.callLater
- self.selectedArgs = None
-
- def login():
- d = self.client.login('testuser', 'password-test')
- c.advance(5)
- d.addErrback(timedOut)
- return d
-
- def timedOut(failure):
- self._cbStopClient(None)
- failure.trap(error.TimeoutError)
-
- d = self.connected.addCallback(strip(login))
- d.addErrback(self._ebGeneral)
- return defer.gatherResults([d, self.loopback()])
-
-
- def test_longFetchDoesntTimeout(self):
- """
- The connection timeout does not take effect during fetches.
- """
- c = Clock()
- SlowMailbox.callLater = c.callLater
- SlowMailbox.fetchDeferred = defer.Deferred()
- self.server.callLater = c.callLater
- SimpleServer.theAccount.mailboxFactory = SlowMailbox
- SimpleServer.theAccount.addMailbox('mailbox-test')
-
- self.server.setTimeout(1)
-
- def login():
- return self.client.login('testuser', 'password-test')
- def select():
- self.server.setTimeout(1)
- return self.client.select('mailbox-test')
- def fetch():
- return self.client.fetchUID('1:*')
- def stillConnected():
- self.assertNotEquals(self.server.state, 'timeout')
-
- def cbAdvance(ignored):
- for i in xrange(4):
- c.advance(.5)
-
- SlowMailbox.fetchDeferred.addCallback(cbAdvance)
-
- d1 = self.connected.addCallback(strip(login))
- d1.addCallback(strip(select))
- d1.addCallback(strip(fetch))
- d1.addCallback(strip(stillConnected))
- d1.addCallback(self._cbStopClient)
- d1.addErrback(self._ebGeneral)
- d = defer.gatherResults([d1, self.loopback()])
- return d
-
-
- def test_idleClientDoesDisconnect(self):
- """
- The *server* has a timeout mechanism which will close connections that
- are inactive for a period.
- """
- c = Clock()
- # Hook up our server protocol
- transport = StringTransportWithDisconnection()
- transport.protocol = self.server
- self.server.callLater = c.callLater
- self.server.makeConnection(transport)
-
- # Make sure we can notice when the connection goes away
- lost = []
- connLost = self.server.connectionLost
- self.server.connectionLost = lambda reason: (lost.append(None), connLost(reason))[1]
-
- # 2/3rds of the idle timeout elapses...
- c.pump([0.0] + [self.server.timeOut / 3.0] * 2)
- self.failIf(lost, lost)
-
- # Now some more
- c.pump([0.0, self.server.timeOut / 2.0])
- self.failUnless(lost)
-
-
-
-class Disconnection(unittest.TestCase):
- def testClientDisconnectFailsDeferreds(self):
- c = imap4.IMAP4Client()
- t = StringTransportWithDisconnection()
- c.makeConnection(t)
- d = self.assertFailure(c.login('testuser', 'example.com'), error.ConnectionDone)
- c.connectionLost(error.ConnectionDone("Connection closed"))
- return d
-
-
-
-class SynchronousMailbox(object):
- """
- Trivial, in-memory mailbox implementation which can produce a message
- synchronously.
- """
- def __init__(self, messages):
- self.messages = messages
-
-
- def fetch(self, msgset, uid):
- assert not uid, "Cannot handle uid requests."
- for msg in msgset:
- yield msg, self.messages[msg - 1]
-
-
-
-class StringTransportConsumer(StringTransport):
- producer = None
- streaming = None
-
- def registerProducer(self, producer, streaming):
- self.producer = producer
- self.streaming = streaming
-
-
-
-class Pipelining(unittest.TestCase):
- """
- Tests for various aspects of the IMAP4 server's pipelining support.
- """
- messages = [
- FakeyMessage({}, [], '', '0', None, None),
- FakeyMessage({}, [], '', '1', None, None),
- FakeyMessage({}, [], '', '2', None, None),
- ]
-
- def setUp(self):
- self.iterators = []
-
- self.transport = StringTransportConsumer()
- self.server = imap4.IMAP4Server(None, None, self.iterateInReactor)
- self.server.makeConnection(self.transport)
-
-
- def iterateInReactor(self, iterator):
- d = defer.Deferred()
- self.iterators.append((iterator, d))
- return d
-
-
- def tearDown(self):
- self.server.connectionLost(failure.Failure(error.ConnectionDone()))
-
-
- def test_synchronousFetch(self):
- """
- Test that pipelined FETCH commands which can be responded to
- synchronously are responded to correctly.
- """
- mailbox = SynchronousMailbox(self.messages)
-
- # Skip over authentication and folder selection
- self.server.state = 'select'
- self.server.mbox = mailbox
-
- # Get rid of any greeting junk
- self.transport.clear()
-
- # Here's some pipelined stuff
- self.server.dataReceived(
- '01 FETCH 1 BODY[]\r\n'
- '02 FETCH 2 BODY[]\r\n'
- '03 FETCH 3 BODY[]\r\n')
-
- # Flush anything the server has scheduled to run
- while self.iterators:
- for e in self.iterators[0][0]:
- break
- else:
- self.iterators.pop(0)[1].callback(None)
-
- # The bodies are empty because we aren't simulating a transport
- # exactly correctly (we have StringTransportConsumer but we never
- # call resumeProducing on its producer). It doesn't matter: just
- # make sure the surrounding structure is okay, and that no
- # exceptions occurred.
- self.assertEqual(
- self.transport.value(),
- '* 1 FETCH (BODY[] )\r\n'
- '01 OK FETCH completed\r\n'
- '* 2 FETCH (BODY[] )\r\n'
- '02 OK FETCH completed\r\n'
- '* 3 FETCH (BODY[] )\r\n'
- '03 OK FETCH completed\r\n')
-
-
-
-if ClientTLSContext is None:
- for case in (TLSTestCase,):
- case.skip = "OpenSSL not present"
-elif interfaces.IReactorSSL(reactor, None) is None:
- for case in (TLSTestCase,):
- case.skip = "Reactor doesn't support SSL"