aboutsummaryrefslogtreecommitdiffstats
path: root/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/internet/test/test_tls.py
blob: 223d5f68d6f94f6a12f434d91a9704a382f07885 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.

"""
Tests for implementations of L{ITLSTransport}.
"""

__metaclass__ = type

import sys, operator

from zope.interface import implements

from twisted.internet.test.reactormixins import ReactorBuilder, EndpointCreator
from twisted.internet.protocol import ServerFactory, ClientFactory, Protocol
from twisted.internet.interfaces import (
    IReactorSSL, ITLSTransport, IStreamClientEndpoint)
from twisted.internet.defer import Deferred, DeferredList
from twisted.internet.endpoints import (
    SSL4ServerEndpoint, SSL4ClientEndpoint, TCP4ClientEndpoint)
from twisted.internet.error import ConnectionClosed
from twisted.internet.task import Cooperator
from twisted.trial.unittest import TestCase, SkipTest
from twisted.python.runtime import platform

from twisted.internet.test.test_core import ObjectModelIntegrationMixin
from twisted.internet.test.test_tcp import (
    StreamTransportTestsMixin, AbortConnectionMixin)
from twisted.internet.test.connectionmixins import ConnectionTestsMixin
from twisted.internet.test.connectionmixins import BrokenContextFactory

try:
    from OpenSSL.crypto import FILETYPE_PEM
except ImportError:
    FILETYPE_PEM = None
else:
    from twisted.internet.ssl import PrivateCertificate, KeyPair
    from twisted.internet.ssl import ClientContextFactory


class TLSMixin:
    requiredInterfaces = [IReactorSSL]

    if platform.isWindows():
        msg = (
            "For some reason, these reactors don't deal with SSL "
            "disconnection correctly on Windows.  See #3371.")
        skippedReactors = {
            "twisted.internet.glib2reactor.Glib2Reactor": msg,
            "twisted.internet.gtk2reactor.Gtk2Reactor": msg}


class ContextGeneratingMixin(object):
    _certificateText = (
        "-----BEGIN CERTIFICATE-----\n"
        "MIIDBjCCAm+gAwIBAgIBATANBgkqhkiG9w0BAQQFADB7MQswCQYDVQQGEwJTRzER\n"
        "MA8GA1UEChMITTJDcnlwdG8xFDASBgNVBAsTC00yQ3J5cHRvIENBMSQwIgYDVQQD\n"
        "ExtNMkNyeXB0byBDZXJ0aWZpY2F0ZSBNYXN0ZXIxHTAbBgkqhkiG9w0BCQEWDm5n\n"
        "cHNAcG9zdDEuY29tMB4XDTAwMDkxMDA5NTEzMFoXDTAyMDkxMDA5NTEzMFowUzEL\n"
        "MAkGA1UEBhMCU0cxETAPBgNVBAoTCE0yQ3J5cHRvMRIwEAYDVQQDEwlsb2NhbGhv\n"
        "c3QxHTAbBgkqhkiG9w0BCQEWDm5ncHNAcG9zdDEuY29tMFwwDQYJKoZIhvcNAQEB\n"
        "BQADSwAwSAJBAKy+e3dulvXzV7zoTZWc5TzgApr8DmeQHTYC8ydfzH7EECe4R1Xh\n"
        "5kwIzOuuFfn178FBiS84gngaNcrFi0Z5fAkCAwEAAaOCAQQwggEAMAkGA1UdEwQC\n"
        "MAAwLAYJYIZIAYb4QgENBB8WHU9wZW5TU0wgR2VuZXJhdGVkIENlcnRpZmljYXRl\n"
        "MB0GA1UdDgQWBBTPhIKSvnsmYsBVNWjj0m3M2z0qVTCBpQYDVR0jBIGdMIGagBT7\n"
        "hyNp65w6kxXlxb8pUU/+7Sg4AaF/pH0wezELMAkGA1UEBhMCU0cxETAPBgNVBAoT\n"
        "CE0yQ3J5cHRvMRQwEgYDVQQLEwtNMkNyeXB0byBDQTEkMCIGA1UEAxMbTTJDcnlw\n"
        "dG8gQ2VydGlmaWNhdGUgTWFzdGVyMR0wGwYJKoZIhvcNAQkBFg5uZ3BzQHBvc3Qx\n"
        "LmNvbYIBADANBgkqhkiG9w0BAQQFAAOBgQA7/CqT6PoHycTdhEStWNZde7M/2Yc6\n"
        "BoJuVwnW8YxGO8Sn6UJ4FeffZNcYZddSDKosw8LtPOeWoK3JINjAk5jiPQ2cww++\n"
        "7QGG/g5NDjxFZNDJP1dGiLAxPW6JXwov4v0FmdzfLOZ01jDcgQQZqEpYlgpuI5JE\n"
        "WUQ9Ho4EzbYCOQ==\n"
        "-----END CERTIFICATE-----\n")

    _privateKeyText = (
        "-----BEGIN RSA PRIVATE KEY-----\n"
        "MIIBPAIBAAJBAKy+e3dulvXzV7zoTZWc5TzgApr8DmeQHTYC8ydfzH7EECe4R1Xh\n"
        "5kwIzOuuFfn178FBiS84gngaNcrFi0Z5fAkCAwEAAQJBAIqm/bz4NA1H++Vx5Ewx\n"
        "OcKp3w19QSaZAwlGRtsUxrP7436QjnREM3Bm8ygU11BjkPVmtrKm6AayQfCHqJoT\n"
        "ZIECIQDW0BoMoL0HOYM/mrTLhaykYAVqgIeJsPjvkEhTFXWBuQIhAM3deFAvWNu4\n"
        "nklUQ37XsCT2c9tmNt1LAT+slG2JOTTRAiAuXDtC/m3NYVwyHfFm+zKHRzHkClk2\n"
        "HjubeEgjpj32AQIhAJqMGTaZVOwevTXvvHwNEH+vRWsAYU/gbx+OQB+7VOcBAiEA\n"
        "oolb6NMg/R3enNPvS1O4UU1H8wpaF77L4yiSWlE0p4w=\n"
        "-----END RSA PRIVATE KEY-----\n")


    def getServerContext(self):
        """
        Return a new SSL context suitable for use in a test server.
        """
        cert = PrivateCertificate.load(
            self._certificateText,
            KeyPair.load(self._privateKeyText, FILETYPE_PEM),
            FILETYPE_PEM)
        return cert.options()


    def getClientContext(self):
        return ClientContextFactory()



class StartTLSClientEndpoint(object):
    """
    An endpoint which wraps another one and adds a TLS layer immediately when
    connections are set up.

    @ivar wrapped: A L{IStreamClientEndpoint} provider which will be used to
        really set up connections.

    @ivar contextFactory: A L{ContextFactory} to use to do TLS.
    """
    implements(IStreamClientEndpoint)

    def __init__(self, wrapped, contextFactory):
        self.wrapped = wrapped
        self.contextFactory = contextFactory


    def connect(self, factory):
        """
        Establish a connection using a protocol build by C{factory} and
        immediately start TLS on it.  Return a L{Deferred} which fires with the
        protocol instance.
        """
        # This would be cleaner when we have ITransport.switchProtocol, which
        # will be added with ticket #3204:
        class WrapperFactory(ServerFactory):
            def buildProtocol(wrapperSelf, addr):
                protocol = factory.buildProtocol(addr)
                def connectionMade(orig=protocol.connectionMade):
                    protocol.transport.startTLS(self.contextFactory)
                    orig()
                protocol.connectionMade = connectionMade
                return protocol

        return self.wrapped.connect(WrapperFactory())



class StartTLSClientCreator(EndpointCreator, ContextGeneratingMixin):
    """
    Create L{ITLSTransport.startTLS} endpoint for the client, and normal SSL
    for server just because it's easier.
    """
    def server(self, reactor):
        """
        Construct an SSL server endpoint.  This should be be constructing a TCP
        server endpoint which immediately calls C{startTLS} instead, but that
        is hard.
        """
        return SSL4ServerEndpoint(reactor, 0, self.getServerContext())


    def client(self, reactor, serverAddress):
        """
        Construct a TCP client endpoint wrapped to immediately start TLS.
        """
        return StartTLSClientEndpoint(
            TCP4ClientEndpoint(
                reactor, '127.0.0.1', serverAddress.port),
            ClientContextFactory())



class BadContextTestsMixin(object):
    """
    Mixin for L{ReactorBuilder} subclasses which defines a helper for testing
    the handling of broken context factories.
    """
    def _testBadContext(self, useIt):
        """
        Assert that the exception raised by a broken context factory's
        C{getContext} method is raised by some reactor method.  If it is not, an
        exception will be raised to fail the test.

        @param useIt: A two-argument callable which will be called with a
            reactor and a broken context factory and which is expected to raise
            the same exception as the broken context factory's C{getContext}
            method.
        """
        reactor = self.buildReactor()
        exc = self.assertRaises(
            ValueError, useIt, reactor, BrokenContextFactory())
        self.assertEqual(BrokenContextFactory.message, str(exc))



class StartTLSClientTestsMixin(TLSMixin, ReactorBuilder, ConnectionTestsMixin):
    """
    Tests for TLS connections established using L{ITLSTransport.startTLS} (as
    opposed to L{IReactorSSL.connectSSL} or L{IReactorSSL.listenSSL}).
    """
    endpoints = StartTLSClientCreator()



class SSLCreator(EndpointCreator, ContextGeneratingMixin):
    """
    Create SSL endpoints.
    """
    def server(self, reactor):
        """
        Create an SSL server endpoint on a TCP/IP-stack allocated port.
        """
        return SSL4ServerEndpoint(reactor, 0, self.getServerContext())


    def client(self, reactor, serverAddress):
        """
        Create an SSL client endpoint which will connect localhost on
        the port given by C{serverAddress}.

        @type serverAddress: L{IPv4Address}
        """
        return SSL4ClientEndpoint(
            reactor, '127.0.0.1', serverAddress.port,
            ClientContextFactory())


class SSLClientTestsMixin(TLSMixin, ReactorBuilder, ContextGeneratingMixin,
                          ConnectionTestsMixin, BadContextTestsMixin):
    """
    Mixin defining tests relating to L{ITLSTransport}.
    """
    endpoints = SSLCreator()

    def test_badContext(self):
        """
        If the context factory passed to L{IReactorSSL.connectSSL} raises an
        exception from its C{getContext} method, that exception is raised by
        L{IReactorSSL.connectSSL}.
        """
        def useIt(reactor, contextFactory):
            return reactor.connectSSL(
                "127.0.0.1", 1234, ClientFactory(), contextFactory)
        self._testBadContext(useIt)


    def test_disconnectAfterWriteAfterStartTLS(self):
        """
        L{ITCPTransport.loseConnection} ends a connection which was set up with
        L{ITLSTransport.startTLS} and which has recently been written to.  This
        is intended to verify that a socket send error masked by the TLS
        implementation doesn't prevent the connection from being reported as
        closed.
        """
        class ShortProtocol(Protocol):
            def connectionMade(self):
                if not ITLSTransport.providedBy(self.transport):
                    # Functionality isn't available to be tested.
                    finished = self.factory.finished
                    self.factory.finished = None
                    finished.errback(SkipTest("No ITLSTransport support"))
                    return

                # Switch the transport to TLS.
                self.transport.startTLS(self.factory.context)
                # Force TLS to really get negotiated.  If nobody talks, nothing
                # will happen.
                self.transport.write("x")

            def dataReceived(self, data):
                # Stuff some bytes into the socket.  This mostly has the effect
                # of causing the next write to fail with ENOTCONN or EPIPE.
                # With the pyOpenSSL implementation of ITLSTransport, the error
                # is swallowed outside of the control of Twisted.
                self.transport.write("y")
                # Now close the connection, which requires a TLS close alert to
                # be sent.
                self.transport.loseConnection()

            def connectionLost(self, reason):
                # This is the success case.  The client and the server want to
                # get here.
                finished = self.factory.finished
                if finished is not None:
                    self.factory.finished = None
                    finished.callback(reason)

        reactor = self.buildReactor()

        serverFactory = ServerFactory()
        serverFactory.finished = Deferred()
        serverFactory.protocol = ShortProtocol
        serverFactory.context = self.getServerContext()

        clientFactory = ClientFactory()
        clientFactory.finished = Deferred()
        clientFactory.protocol = ShortProtocol
        clientFactory.context = self.getClientContext()
        clientFactory.context.method = serverFactory.context.method

        lostConnectionResults = []
        finished = DeferredList(
            [serverFactory.finished, clientFactory.finished],
            consumeErrors=True)
        def cbFinished(results):
            lostConnectionResults.extend([results[0][1], results[1][1]])
        finished.addCallback(cbFinished)

        port = reactor.listenTCP(0, serverFactory, interface='127.0.0.1')
        self.addCleanup(port.stopListening)

        connector = reactor.connectTCP(
            port.getHost().host, port.getHost().port, clientFactory)
        self.addCleanup(connector.disconnect)

        finished.addCallback(lambda ign: reactor.stop())
        self.runReactor(reactor)
        lostConnectionResults[0].trap(ConnectionClosed)
        lostConnectionResults[1].trap(ConnectionClosed)



class TLSPortTestsBuilder(TLSMixin, ContextGeneratingMixin,
                          ObjectModelIntegrationMixin, BadContextTestsMixin,
                          StreamTransportTestsMixin, ReactorBuilder):
    """
    Tests for L{IReactorSSL.listenSSL}
    """
    def getListeningPort(self, reactor, factory):
        """
        Get a TLS port from a reactor.
        """
        return reactor.listenSSL(0, factory, self.getServerContext())


    def getExpectedStartListeningLogMessage(self, port, factory):
        """
        Get the message expected to be logged when a TLS port starts listening.
        """
        return "%s (TLS) starting on %d" % (factory, port.getHost().port)


    def getExpectedConnectionLostLogMsg(self, port):
        """
        Get the expected connection lost message for a TLS port.
        """
        return "(TLS Port %s Closed)" % (port.getHost().port,)


    def test_badContext(self):
        """
        If the context factory passed to L{IReactorSSL.listenSSL} raises an
        exception from its C{getContext} method, that exception is raised by
        L{IReactorSSL.listenSSL}.
        """
        def useIt(reactor, contextFactory):
            return reactor.listenSSL(0, ServerFactory(), contextFactory)
        self._testBadContext(useIt)



globals().update(SSLClientTestsMixin.makeTestCaseClasses())
globals().update(StartTLSClientTestsMixin.makeTestCaseClasses())
globals().update(TLSPortTestsBuilder().makeTestCaseClasses())



class AbortSSLConnectionTest(ReactorBuilder, AbortConnectionMixin, ContextGeneratingMixin):
    """
    C{abortConnection} tests using SSL.
    """
    requiredInterfaces = (IReactorSSL,)
    endpoints = SSLCreator()

    def buildReactor(self):
        reactor = ReactorBuilder.buildReactor(self)
        try:
            from twisted.protocols import tls
        except ImportError:
            return reactor

        # Patch twisted.protocols.tls to use this reactor, until we get
        # around to fixing #5206, or the TLS code uses an explicit reactor:
        cooperator = Cooperator(
            scheduler=lambda x: reactor.callLater(0.00001, x))
        self.patch(tls, "cooperate", cooperator.cooperate)
        return reactor


    def setUp(self):
        if FILETYPE_PEM is None:
            raise SkipTest("OpenSSL not available.")

globals().update(AbortSSLConnectionTest.makeTestCaseClasses())

class OldTLSDeprecationTest(TestCase):
    """
    Tests for the deprecation of L{twisted.internet._oldtls}, the implementation
    module for L{IReactorSSL} used when only an old version of pyOpenSSL is
    available.
    """
    def test_warning(self):
        """
        The use of L{twisted.internet._oldtls} is deprecated, and emits a
        L{DeprecationWarning}.
        """
        # Since _oldtls depends on OpenSSL, just skip this test if it isn't
        # installed on the system.  Faking it would be error prone.
        try:
            import OpenSSL
        except ImportError:
            raise SkipTest("OpenSSL not available.")

        # Change the apparent version of OpenSSL to one support for which is
        # deprecated.  And have it change back again after the test.
        self.patch(OpenSSL, '__version__', '0.5')

        # If the module was already imported, the import statement below won't
        # execute its top-level code.  Take it out of sys.modules so the import
        # system re-evaluates it.  Arrange to put the original back afterwards.
        # Also handle the case where it hasn't yet been imported.
        try:
            oldtls = sys.modules['twisted.internet._oldtls']
        except KeyError:
            self.addCleanup(sys.modules.pop, 'twisted.internet._oldtls')
        else:
            del sys.modules['twisted.internet._oldtls']
            self.addCleanup(
                operator.setitem, sys.modules, 'twisted.internet._oldtls',
                oldtls)

        # The actual test.
        import twisted.internet._oldtls
        warnings = self.flushWarnings()
        self.assertEqual(warnings[0]['category'], DeprecationWarning)
        self.assertEqual(
            warnings[0]['message'],
            "Support for pyOpenSSL 0.5 is deprecated.  "
            "Upgrade to pyOpenSSL 0.10 or newer.")