aboutsummaryrefslogtreecommitdiffstats
path: root/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/mail/relaymanager.py
blob: 66c777ade4297cd0c6172772a36584e318a3b2d0 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
# -*- test-case-name: twisted.mail.test.test_mail -*-
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.

"""
Infrastructure for relaying mail through smart host

Today, internet e-mail has stopped being Peer-to-peer for many problems,
spam (unsolicited bulk mail) among them. Instead, most nodes on the
internet send all e-mail to a single computer, usually the ISP's though
sometimes other schemes, such as SMTP-after-POP, are used. This computer
is supposedly permanently up and traceable, and will do the work of
figuring out MXs and connecting to them. This kind of configuration
is usually termed "smart host", since the host we are connecting to
is "smart" (and will find MXs and connect to them) rather then just
accepting mail for a small set of domains.

The classes here are meant to facilitate support for such a configuration
for the twisted.mail SMTP server
"""

import rfc822
import os
import time

try:
    import cPickle as pickle
except ImportError:
    import pickle

from twisted.python import log
from twisted.python.failure import Failure
from twisted.python.compat import set
from twisted.mail import relay
from twisted.mail import bounce
from twisted.internet import protocol
from twisted.internet.defer import Deferred, DeferredList
from twisted.internet.error import DNSLookupError
from twisted.mail import smtp
from twisted.application import internet

class ManagedRelayerMixin:
    """SMTP Relayer which notifies a manager

    Notify the manager about successful mail, failed mail
    and broken connections
    """

    def __init__(self, manager):
        self.manager = manager

    def sentMail(self, code, resp, numOk, addresses, log):
        """called when e-mail has been sent

        we will always get 0 or 1 addresses.
        """
        message = self.names[0]
        if code in smtp.SUCCESS:
            self.manager.notifySuccess(self.factory, message)
        else:
            self.manager.notifyFailure(self.factory, message)
        del self.messages[0]
        del self.names[0]

    def connectionLost(self, reason):
        """called when connection is broken

        notify manager we will try to send no more e-mail
        """
        self.manager.notifyDone(self.factory)

class SMTPManagedRelayer(ManagedRelayerMixin, relay.SMTPRelayer):
    def __init__(self, messages, manager, *args, **kw):
        """
        @type messages: C{list} of C{str}
        @param messages: Filenames of messages to relay

        manager should support .notifySuccess, .notifyFailure
        and .notifyDone
        """
        ManagedRelayerMixin.__init__(self, manager)
        relay.SMTPRelayer.__init__(self, messages, *args, **kw)

class ESMTPManagedRelayer(ManagedRelayerMixin, relay.ESMTPRelayer):
    def __init__(self, messages, manager, *args, **kw):
        """
        @type messages: C{list} of C{str}
        @param messages: Filenames of messages to relay

        manager should support .notifySuccess, .notifyFailure
        and .notifyDone
        """
        ManagedRelayerMixin.__init__(self, manager)
        relay.ESMTPRelayer.__init__(self, messages, *args, **kw)

class SMTPManagedRelayerFactory(protocol.ClientFactory):
    protocol = SMTPManagedRelayer

    def __init__(self, messages, manager, *args, **kw):
        self.messages = messages
        self.manager = manager
        self.pArgs = args
        self.pKwArgs = kw

    def buildProtocol(self, addr):
        protocol = self.protocol(self.messages, self.manager, *self.pArgs,
            **self.pKwArgs)
        protocol.factory = self
        return protocol

    def clientConnectionFailed(self, connector, reason):
        """called when connection could not be made

        our manager should be notified that this happened,
        it might prefer some other host in that case"""
        self.manager.notifyNoConnection(self)
        self.manager.notifyDone(self)

class ESMTPManagedRelayerFactory(SMTPManagedRelayerFactory):
    protocol = ESMTPManagedRelayer

    def __init__(self, messages, manager, secret, contextFactory, *args, **kw):
        self.secret = secret
        self.contextFactory = contextFactory
        SMTPManagedRelayerFactory.__init__(self, messages, manager, *args, **kw)

    def buildProtocol(self, addr):
        s = self.secret and self.secret(addr)
        protocol = self.protocol(self.messages, self.manager, s,
            self.contextFactory, *self.pArgs, **self.pKwArgs)
        protocol.factory = self
        return protocol

class Queue:
    """A queue of ougoing emails."""

    noisy = True

    def __init__(self, directory):
        self.directory = directory
        self._init()

    def _init(self):
        self.n = 0
        self.waiting = {}
        self.relayed = {}
        self.readDirectory()

    def __getstate__(self):
        """(internal) delete volatile state"""
        return {'directory' : self.directory}

    def __setstate__(self, state):
        """(internal) restore volatile state"""
        self.__dict__.update(state)
        self._init()

    def readDirectory(self):
        """Read the messages directory.

        look for new messages.
        """
        for message in os.listdir(self.directory):
            # Skip non data files
            if message[-2:]!='-D':
                continue
            self.addMessage(message[:-2])

    def getWaiting(self):
        return self.waiting.keys()

    def hasWaiting(self):
        return len(self.waiting) > 0

    def getRelayed(self):
        return self.relayed.keys()

    def setRelaying(self, message):
        del self.waiting[message]
        self.relayed[message] = 1

    def setWaiting(self, message):
        del self.relayed[message]
        self.waiting[message] = 1

    def addMessage(self, message):
        if message not in self.relayed:
            self.waiting[message] = 1
            if self.noisy:
                log.msg('Set ' + message + ' waiting')

    def done(self, message):
        """Remove message to from queue."""
        message = os.path.basename(message)
        os.remove(self.getPath(message) + '-D')
        os.remove(self.getPath(message) + '-H')
        del self.relayed[message]

    def getPath(self, message):
        """Get the path in the filesystem of a message."""
        return os.path.join(self.directory, message)

    def getEnvelope(self, message):
        return pickle.load(self.getEnvelopeFile(message))

    def getEnvelopeFile(self, message):
        return open(os.path.join(self.directory, message+'-H'), 'rb')

    def createNewMessage(self):
        """Create a new message in the queue.

        Return a tuple - file-like object for headers, and ISMTPMessage.
        """
        fname = "%s_%s_%s_%s" % (os.getpid(), time.time(), self.n, id(self))
        self.n = self.n + 1
        headerFile = open(os.path.join(self.directory, fname+'-H'), 'wb')
        tempFilename = os.path.join(self.directory, fname+'-C')
        finalFilename = os.path.join(self.directory, fname+'-D')
        messageFile = open(tempFilename, 'wb')

        from twisted.mail.mail import FileMessage
        return headerFile,FileMessage(messageFile, tempFilename, finalFilename)


class _AttemptManager(object):
    """
    Manage the state of a single attempt to flush the relay queue.
    """
    def __init__(self, manager):
        self.manager = manager
        self._completionDeferreds = []


    def getCompletionDeferred(self):
        self._completionDeferreds.append(Deferred())
        return self._completionDeferreds[-1]


    def _finish(self, relay, message):
        self.manager.managed[relay].remove(os.path.basename(message))
        self.manager.queue.done(message)


    def notifySuccess(self, relay, message):
        """a relay sent a message successfully

        Mark it as sent in our lists
        """
        if self.manager.queue.noisy:
            log.msg("success sending %s, removing from queue" % message)
        self._finish(relay, message)


    def notifyFailure(self, relay, message):
        """Relaying the message has failed."""
        if self.manager.queue.noisy:
            log.msg("could not relay "+message)
        # Moshe - Bounce E-mail here
        # Be careful: if it's a bounced bounce, silently
        # discard it
        message = os.path.basename(message)
        fp = self.manager.queue.getEnvelopeFile(message)
        from_, to = pickle.load(fp)
        fp.close()
        from_, to, bounceMessage = bounce.generateBounce(open(self.manager.queue.getPath(message)+'-D'), from_, to)
        fp, outgoingMessage = self.manager.queue.createNewMessage()
        pickle.dump([from_, to], fp)
        fp.close()
        for line in bounceMessage.splitlines():
             outgoingMessage.lineReceived(line)
        outgoingMessage.eomReceived()
        self._finish(relay, self.manager.queue.getPath(message))


    def notifyDone(self, relay):
        """A relaying SMTP client is disconnected.

        unmark all pending messages under this relay's responsibility
        as being relayed, and remove the relay.
        """
        for message in self.manager.managed.get(relay, ()):
            if self.manager.queue.noisy:
                log.msg("Setting " + message + " waiting")
            self.manager.queue.setWaiting(message)
        try:
            del self.manager.managed[relay]
        except KeyError:
            pass
        notifications = self._completionDeferreds
        self._completionDeferreds = None
        for d in notifications:
            d.callback(None)


    def notifyNoConnection(self, relay):
        """Relaying SMTP client couldn't connect.

        Useful because it tells us our upstream server is unavailable.
        """
        # Back off a bit
        try:
            msgs = self.manager.managed[relay]
        except KeyError:
            log.msg("notifyNoConnection passed unknown relay!")
            return

        if self.manager.queue.noisy:
            log.msg("Backing off on delivery of " + str(msgs))
        def setWaiting(queue, messages):
            map(queue.setWaiting, messages)
        from twisted.internet import reactor
        reactor.callLater(30, setWaiting, self.manager.queue, msgs)
        del self.manager.managed[relay]



class SmartHostSMTPRelayingManager:
    """Manage SMTP Relayers

    Manage SMTP relayers, keeping track of the existing connections,
    each connection's responsibility in term of messages. Create
    more relayers if the need arises.

    Someone should press .checkState periodically

    @ivar fArgs: Additional positional arguments used to instantiate
    C{factory}.

    @ivar fKwArgs: Additional keyword arguments used to instantiate
    C{factory}.

    @ivar factory: A callable which returns a ClientFactory suitable for
    making SMTP connections.
    """

    factory = SMTPManagedRelayerFactory

    PORT = 25

    mxcalc = None

    def __init__(self, queue, maxConnections=2, maxMessagesPerConnection=10):
        """
        @type queue: Any implementor of C{IQueue}
        @param queue: The object used to queue messages on their way to
        delivery.

        @type maxConnections: C{int}
        @param maxConnections: The maximum number of SMTP connections to
        allow to be opened at any given time.

        @type maxMessagesPerConnection: C{int}
        @param maxMessagesPerConnection: The maximum number of messages a
        relayer will be given responsibility for.

        Default values are meant for a small box with 1-5 users.
        """
        self.maxConnections = maxConnections
        self.maxMessagesPerConnection = maxMessagesPerConnection
        self.managed = {} # SMTP clients we're managing
        self.queue = queue
        self.fArgs = ()
        self.fKwArgs = {}

    def __getstate__(self):
        """(internal) delete volatile state"""
        dct = self.__dict__.copy()
        del dct['managed']
        return dct

    def __setstate__(self, state):
        """(internal) restore volatile state"""
        self.__dict__.update(state)
        self.managed = {}

    def checkState(self):
        """
        Synchronize with the state of the world, and maybe launch a new
        relay.

        Call me periodically to check I am still up to date.

        @return: None or a Deferred which fires when all of the SMTP clients
        started by this call have disconnected.
        """
        self.queue.readDirectory()
        if (len(self.managed) >= self.maxConnections):
            return
        if  not self.queue.hasWaiting():
            return

        return self._checkStateMX()

    def _checkStateMX(self):
        nextMessages = self.queue.getWaiting()
        nextMessages.reverse()

        exchanges = {}
        for msg in nextMessages:
            from_, to = self.queue.getEnvelope(msg)
            name, addr = rfc822.parseaddr(to)
            parts = addr.split('@', 1)
            if len(parts) != 2:
                log.err("Illegal message destination: " + to)
                continue
            domain = parts[1]

            self.queue.setRelaying(msg)
            exchanges.setdefault(domain, []).append(self.queue.getPath(msg))
            if len(exchanges) >= (self.maxConnections - len(self.managed)):
                break

        if self.mxcalc is None:
            self.mxcalc = MXCalculator()

        relays = []
        for (domain, msgs) in exchanges.iteritems():
            manager = _AttemptManager(self)
            factory = self.factory(msgs, manager, *self.fArgs, **self.fKwArgs)
            self.managed[factory] = map(os.path.basename, msgs)
            relayAttemptDeferred = manager.getCompletionDeferred()
            connectSetupDeferred = self.mxcalc.getMX(domain)
            connectSetupDeferred.addCallback(lambda mx: str(mx.name))
            connectSetupDeferred.addCallback(self._cbExchange, self.PORT, factory)
            connectSetupDeferred.addErrback(lambda err: (relayAttemptDeferred.errback(err), err)[1])
            connectSetupDeferred.addErrback(self._ebExchange, factory, domain)
            relays.append(relayAttemptDeferred)
        return DeferredList(relays)


    def _cbExchange(self, address, port, factory):
        from twisted.internet import reactor
        reactor.connectTCP(address, port, factory)

    def _ebExchange(self, failure, factory, domain):
        log.err('Error setting up managed relay factory for ' + domain)
        log.err(failure)
        def setWaiting(queue, messages):
            map(queue.setWaiting, messages)
        from twisted.internet import reactor
        reactor.callLater(30, setWaiting, self.queue, self.managed[factory])
        del self.managed[factory]

class SmartHostESMTPRelayingManager(SmartHostSMTPRelayingManager):
    factory = ESMTPManagedRelayerFactory

def _checkState(manager):
    manager.checkState()

def RelayStateHelper(manager, delay):
    return internet.TimerService(delay, _checkState, manager)



class CanonicalNameLoop(Exception):
    """
    When trying to look up the MX record for a host, a set of CNAME records was
    found which form a cycle and resolution was abandoned.
    """


class CanonicalNameChainTooLong(Exception):
    """
    When trying to look up the MX record for a host, too many CNAME records
    which point to other CNAME records were encountered and resolution was
    abandoned.
    """


class MXCalculator:
    """
    A utility for looking up mail exchange hosts and tracking whether they are
    working or not.

    @ivar clock: L{IReactorTime} provider which will be used to decide when to
        retry mail exchanges which have not been working.
    """
    timeOutBadMX = 60 * 60 # One hour
    fallbackToDomain = True

    def __init__(self, resolver=None, clock=None):
        self.badMXs = {}
        if resolver is None:
            from twisted.names.client import createResolver
            resolver = createResolver()
        self.resolver = resolver
        if clock is None:
            from twisted.internet import reactor as clock
        self.clock = clock


    def markBad(self, mx):
        """Indicate a given mx host is not currently functioning.

        @type mx: C{str}
        @param mx: The hostname of the host which is down.
        """
        self.badMXs[str(mx)] = self.clock.seconds() + self.timeOutBadMX

    def markGood(self, mx):
        """Indicate a given mx host is back online.

        @type mx: C{str}
        @param mx: The hostname of the host which is up.
        """
        try:
            del self.badMXs[mx]
        except KeyError:
            pass

    def getMX(self, domain, maximumCanonicalChainLength=3):
        """
        Find an MX record for the given domain.

        @type domain: C{str}
        @param domain: The domain name for which to look up an MX record.

        @type maximumCanonicalChainLength: C{int}
        @param maximumCanonicalChainLength: The maximum number of unique CNAME
            records to follow while looking up the MX record.

        @return: A L{Deferred} which is called back with a string giving the
            name in the found MX record or which is errbacked if no MX record
            can be found.
        """
        mailExchangeDeferred = self.resolver.lookupMailExchange(domain)
        mailExchangeDeferred.addCallback(self._filterRecords)
        mailExchangeDeferred.addCallback(
            self._cbMX, domain, maximumCanonicalChainLength)
        mailExchangeDeferred.addErrback(self._ebMX, domain)
        return mailExchangeDeferred


    def _filterRecords(self, records):
        """
        Convert a DNS response (a three-tuple of lists of RRHeaders) into a
        mapping from record names to lists of corresponding record payloads.
        """
        recordBag = {}
        for answer in records[0]:
            recordBag.setdefault(str(answer.name), []).append(answer.payload)
        return recordBag


    def _cbMX(self, answers, domain, cnamesLeft):
        """
        Try to find the MX host from the given DNS information.

        This will attempt to resolve CNAME results.  It can recognize loops
        and will give up on non-cyclic chains after a specified number of
        lookups.
        """
        # Do this import here so that relaymanager.py doesn't depend on
        # twisted.names, only MXCalculator will.
        from twisted.names import dns, error

        seenAliases = set()
        exchanges = []
        # Examine the answers for the domain we asked about
        pertinentRecords = answers.get(domain, [])
        while pertinentRecords:
            record = pertinentRecords.pop()

            # If it's a CNAME, we'll need to do some more processing
            if record.TYPE == dns.CNAME:

                # Remember that this name was an alias.
                seenAliases.add(domain)

                canonicalName = str(record.name)
                # See if we have some local records which might be relevant.
                if canonicalName in answers:

                    # Make sure it isn't a loop contained entirely within the
                    # results we have here.
                    if canonicalName in seenAliases:
                        return Failure(CanonicalNameLoop(record))

                    pertinentRecords = answers[canonicalName]
                    exchanges = []
                else:
                    if cnamesLeft:
                        # Request more information from the server.
                        return self.getMX(canonicalName, cnamesLeft - 1)
                    else:
                        # Give up.
                        return Failure(CanonicalNameChainTooLong(record))

            # If it's an MX, collect it.
            if record.TYPE == dns.MX:
                exchanges.append((record.preference, record))

        if exchanges:
            exchanges.sort()
            for (preference, record) in exchanges:
                host = str(record.name)
                if host not in self.badMXs:
                    return record
                t = self.clock.seconds() - self.badMXs[host]
                if t >= 0:
                    del self.badMXs[host]
                    return record
            return exchanges[0][1]
        else:
            # Treat no answers the same as an error - jump to the errback to try
            # to look up an A record.  This provides behavior described as a
            # special case in RFC 974 in the section headed I{Interpreting the
            # List of MX RRs}.
            return Failure(
                error.DNSNameError("No MX records for %r" % (domain,)))


    def _ebMX(self, failure, domain):
        from twisted.names import error, dns

        if self.fallbackToDomain:
            failure.trap(error.DNSNameError)
            log.msg("MX lookup failed; attempting to use hostname (%s) directly" % (domain,))

            # Alright, I admit, this is a bit icky.
            d = self.resolver.getHostByName(domain)
            def cbResolved(addr):
                return dns.Record_MX(name=addr)
            def ebResolved(err):
                err.trap(error.DNSNameError)
                raise DNSLookupError()
            d.addCallbacks(cbResolved, ebResolved)
            return d
        elif failure.check(error.DNSNameError):
            raise IOError("No MX found for %r" % (domain,))
        return failure