diff options
Diffstat (limited to 'lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/test/test_ssl.py')
-rwxr-xr-x | lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/test/test_ssl.py | 726 |
1 files changed, 0 insertions, 726 deletions
diff --git a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/test/test_ssl.py b/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/test/test_ssl.py deleted file mode 100755 index 6d1a3ec7..00000000 --- a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/test/test_ssl.py +++ /dev/null @@ -1,726 +0,0 @@ -# Copyright (c) Twisted Matrix Laboratories. -# See LICENSE for details. - -""" -Tests for twisted SSL support. -""" - -from twisted.trial import unittest -from twisted.internet import protocol, reactor, interfaces, defer -from twisted.internet.error import ConnectionDone -from twisted.protocols import basic -from twisted.python import util -from twisted.python.runtime import platform -from twisted.test.test_tcp import ProperlyCloseFilesMixin - -import os, errno - -try: - from OpenSSL import SSL, crypto - from twisted.internet import ssl - from twisted.test.ssl_helpers import ClientTLSContext -except ImportError: - def _noSSL(): - # ugh, make pyflakes happy. - global SSL - global ssl - SSL = ssl = None - _noSSL() - -try: - from twisted.protocols import tls as newTLS -except ImportError: - # Assuming SSL exists, we're using old version in reactor (i.e. non-protocol) - newTLS = None - -certPath = util.sibpath(__file__, "server.pem") - - - -class UnintelligentProtocol(basic.LineReceiver): - """ - @ivar deferred: a deferred that will fire at connection lost. - @type deferred: L{defer.Deferred} - - @cvar pretext: text sent before TLS is set up. - @type pretext: C{str} - - @cvar posttext: text sent after TLS is set up. - @type posttext: C{str} - """ - pretext = [ - "first line", - "last thing before tls starts", - "STARTTLS"] - - posttext = [ - "first thing after tls started", - "last thing ever"] - - def __init__(self): - self.deferred = defer.Deferred() - - - def connectionMade(self): - for l in self.pretext: - self.sendLine(l) - - - def lineReceived(self, line): - if line == "READY": - self.transport.startTLS(ClientTLSContext(), self.factory.client) - for l in self.posttext: - self.sendLine(l) - self.transport.loseConnection() - - - def connectionLost(self, reason): - self.deferred.callback(None) - - - -class LineCollector(basic.LineReceiver): - """ - @ivar deferred: a deferred that will fire at connection lost. - @type deferred: L{defer.Deferred} - - @ivar doTLS: whether the protocol is initiate TLS or not. - @type doTLS: C{bool} - - @ivar fillBuffer: if set to True, it will send lots of data once - C{STARTTLS} is received. - @type fillBuffer: C{bool} - """ - - def __init__(self, doTLS, fillBuffer=False): - self.doTLS = doTLS - self.fillBuffer = fillBuffer - self.deferred = defer.Deferred() - - - def connectionMade(self): - self.factory.rawdata = '' - self.factory.lines = [] - - - def lineReceived(self, line): - self.factory.lines.append(line) - if line == 'STARTTLS': - if self.fillBuffer: - for x in range(500): - self.sendLine('X' * 1000) - self.sendLine('READY') - if self.doTLS: - ctx = ServerTLSContext( - privateKeyFileName=certPath, - certificateFileName=certPath, - ) - self.transport.startTLS(ctx, self.factory.server) - else: - self.setRawMode() - - - def rawDataReceived(self, data): - self.factory.rawdata += data - self.transport.loseConnection() - - - def connectionLost(self, reason): - self.deferred.callback(None) - - - -class SingleLineServerProtocol(protocol.Protocol): - """ - A protocol that sends a single line of data at C{connectionMade}. - """ - - def connectionMade(self): - self.transport.write("+OK <some crap>\r\n") - self.transport.getPeerCertificate() - - - -class RecordingClientProtocol(protocol.Protocol): - """ - @ivar deferred: a deferred that will fire with first received content. - @type deferred: L{defer.Deferred} - """ - - def __init__(self): - self.deferred = defer.Deferred() - - - def connectionMade(self): - self.transport.getPeerCertificate() - - - def dataReceived(self, data): - self.deferred.callback(data) - - - -class ImmediatelyDisconnectingProtocol(protocol.Protocol): - """ - A protocol that disconnect immediately on connection. It fires the - C{connectionDisconnected} deferred of its factory on connetion lost. - """ - - def connectionMade(self): - self.transport.loseConnection() - - - def connectionLost(self, reason): - self.factory.connectionDisconnected.callback(None) - - - -def generateCertificateObjects(organization, organizationalUnit): - """ - Create a certificate for given C{organization} and C{organizationalUnit}. - - @return: a tuple of (key, request, certificate) objects. - """ - pkey = crypto.PKey() - pkey.generate_key(crypto.TYPE_RSA, 512) - req = crypto.X509Req() - subject = req.get_subject() - subject.O = organization - subject.OU = organizationalUnit - req.set_pubkey(pkey) - req.sign(pkey, "md5") - - # Here comes the actual certificate - cert = crypto.X509() - cert.set_serial_number(1) - cert.gmtime_adj_notBefore(0) - cert.gmtime_adj_notAfter(60) # Testing certificates need not be long lived - cert.set_issuer(req.get_subject()) - cert.set_subject(req.get_subject()) - cert.set_pubkey(req.get_pubkey()) - cert.sign(pkey, "md5") - - return pkey, req, cert - - - -def generateCertificateFiles(basename, organization, organizationalUnit): - """ - Create certificate files key, req and cert prefixed by C{basename} for - given C{organization} and C{organizationalUnit}. - """ - pkey, req, cert = generateCertificateObjects(organization, organizationalUnit) - - for ext, obj, dumpFunc in [ - ('key', pkey, crypto.dump_privatekey), - ('req', req, crypto.dump_certificate_request), - ('cert', cert, crypto.dump_certificate)]: - fName = os.extsep.join((basename, ext)) - fObj = file(fName, 'w') - fObj.write(dumpFunc(crypto.FILETYPE_PEM, obj)) - fObj.close() - - - -class ContextGeneratingMixin: - """ - Offer methods to create L{ssl.DefaultOpenSSLContextFactory} for both client - and server. - - @ivar clientBase: prefix of client certificate files. - @type clientBase: C{str} - - @ivar serverBase: prefix of server certificate files. - @type serverBase: C{str} - - @ivar clientCtxFactory: a generated context factory to be used in - C{reactor.connectSSL}. - @type clientCtxFactory: L{ssl.DefaultOpenSSLContextFactory} - - @ivar serverCtxFactory: a generated context factory to be used in - C{reactor.listenSSL}. - @type serverCtxFactory: L{ssl.DefaultOpenSSLContextFactory} - """ - - def makeContextFactory(self, org, orgUnit, *args, **kwArgs): - base = self.mktemp() - generateCertificateFiles(base, org, orgUnit) - serverCtxFactory = ssl.DefaultOpenSSLContextFactory( - os.extsep.join((base, 'key')), - os.extsep.join((base, 'cert')), - *args, **kwArgs) - - return base, serverCtxFactory - - - def setupServerAndClient(self, clientArgs, clientKwArgs, serverArgs, - serverKwArgs): - self.clientBase, self.clientCtxFactory = self.makeContextFactory( - *clientArgs, **clientKwArgs) - self.serverBase, self.serverCtxFactory = self.makeContextFactory( - *serverArgs, **serverKwArgs) - - - -if SSL is not None: - class ServerTLSContext(ssl.DefaultOpenSSLContextFactory): - """ - A context factory with a default method set to L{SSL.TLSv1_METHOD}. - """ - isClient = False - - def __init__(self, *args, **kw): - kw['sslmethod'] = SSL.TLSv1_METHOD - ssl.DefaultOpenSSLContextFactory.__init__(self, *args, **kw) - - - -class StolenTCPTestCase(ProperlyCloseFilesMixin, unittest.TestCase): - """ - For SSL transports, test many of the same things which are tested for - TCP transports. - """ - - def createServer(self, address, portNumber, factory): - """ - Create an SSL server with a certificate using L{IReactorSSL.listenSSL}. - """ - cert = ssl.PrivateCertificate.loadPEM(file(certPath).read()) - contextFactory = cert.options() - return reactor.listenSSL( - portNumber, factory, contextFactory, interface=address) - - - def connectClient(self, address, portNumber, clientCreator): - """ - Create an SSL client using L{IReactorSSL.connectSSL}. - """ - contextFactory = ssl.CertificateOptions() - return clientCreator.connectSSL(address, portNumber, contextFactory) - - - def getHandleExceptionType(self): - """ - Return L{SSL.Error} as the expected error type which will be raised by - a write to the L{OpenSSL.SSL.Connection} object after it has been - closed. - """ - return SSL.Error - - - def getHandleErrorCode(self): - """ - Return the argument L{SSL.Error} will be constructed with for this - case. This is basically just a random OpenSSL implementation detail. - It would be better if this test worked in a way which did not require - this. - """ - # Windows 2000 SP 4 and Windows XP SP 2 give back WSAENOTSOCK for - # SSL.Connection.write for some reason. The twisted.protocols.tls - # implementation of IReactorSSL doesn't suffer from this imprecation, - # though, since it is isolated from the Windows I/O layer (I suppose?). - - # If test_properlyCloseFiles waited for the SSL handshake to complete - # and performed an orderly shutdown, then this would probably be a - # little less weird: writing to a shutdown SSL connection has a more - # well-defined failure mode (or at least it should). - - # So figure out if twisted.protocols.tls is in use. If it can be - # imported, it should be. - try: - import twisted.protocols.tls - except ImportError: - # It isn't available, so we expect WSAENOTSOCK if we're on Windows. - if platform.getType() == 'win32': - return errno.WSAENOTSOCK - - # Otherwise, we expect an error about how we tried to write to a - # shutdown connection. This is terribly implementation-specific. - return [('SSL routines', 'SSL_write', 'protocol is shutdown')] - - - -class TLSTestCase(unittest.TestCase): - """ - Tests for startTLS support. - - @ivar fillBuffer: forwarded to L{LineCollector.fillBuffer} - @type fillBuffer: C{bool} - """ - fillBuffer = False - - clientProto = None - serverProto = None - - - def tearDown(self): - if self.clientProto.transport is not None: - self.clientProto.transport.loseConnection() - if self.serverProto.transport is not None: - self.serverProto.transport.loseConnection() - - - def _runTest(self, clientProto, serverProto, clientIsServer=False): - """ - Helper method to run TLS tests. - - @param clientProto: protocol instance attached to the client - connection. - @param serverProto: protocol instance attached to the server - connection. - @param clientIsServer: flag indicated if client should initiate - startTLS instead of server. - - @return: a L{defer.Deferred} that will fire when both connections are - lost. - """ - self.clientProto = clientProto - cf = self.clientFactory = protocol.ClientFactory() - cf.protocol = lambda: clientProto - if clientIsServer: - cf.server = False - else: - cf.client = True - - self.serverProto = serverProto - sf = self.serverFactory = protocol.ServerFactory() - sf.protocol = lambda: serverProto - if clientIsServer: - sf.client = False - else: - sf.server = True - - port = reactor.listenTCP(0, sf, interface="127.0.0.1") - self.addCleanup(port.stopListening) - - reactor.connectTCP('127.0.0.1', port.getHost().port, cf) - - return defer.gatherResults([clientProto.deferred, serverProto.deferred]) - - - def test_TLS(self): - """ - Test for server and client startTLS: client should received data both - before and after the startTLS. - """ - def check(ignore): - self.assertEqual( - self.serverFactory.lines, - UnintelligentProtocol.pretext + UnintelligentProtocol.posttext - ) - d = self._runTest(UnintelligentProtocol(), - LineCollector(True, self.fillBuffer)) - return d.addCallback(check) - - - def test_unTLS(self): - """ - Test for server startTLS not followed by a startTLS in client: the data - received after server startTLS should be received as raw. - """ - def check(ignored): - self.assertEqual( - self.serverFactory.lines, - UnintelligentProtocol.pretext - ) - self.failUnless(self.serverFactory.rawdata, - "No encrypted bytes received") - d = self._runTest(UnintelligentProtocol(), - LineCollector(False, self.fillBuffer)) - return d.addCallback(check) - - - def test_backwardsTLS(self): - """ - Test startTLS first initiated by client. - """ - def check(ignored): - self.assertEqual( - self.clientFactory.lines, - UnintelligentProtocol.pretext + UnintelligentProtocol.posttext - ) - d = self._runTest(LineCollector(True, self.fillBuffer), - UnintelligentProtocol(), True) - return d.addCallback(check) - - - -class SpammyTLSTestCase(TLSTestCase): - """ - Test TLS features with bytes sitting in the out buffer. - """ - fillBuffer = True - - - -class BufferingTestCase(unittest.TestCase): - serverProto = None - clientProto = None - - - def tearDown(self): - if self.serverProto.transport is not None: - self.serverProto.transport.loseConnection() - if self.clientProto.transport is not None: - self.clientProto.transport.loseConnection() - - - def test_openSSLBuffering(self): - serverProto = self.serverProto = SingleLineServerProtocol() - clientProto = self.clientProto = RecordingClientProtocol() - - server = protocol.ServerFactory() - client = self.client = protocol.ClientFactory() - - server.protocol = lambda: serverProto - client.protocol = lambda: clientProto - - sCTX = ssl.DefaultOpenSSLContextFactory(certPath, certPath) - cCTX = ssl.ClientContextFactory() - - port = reactor.listenSSL(0, server, sCTX, interface='127.0.0.1') - self.addCleanup(port.stopListening) - - reactor.connectSSL('127.0.0.1', port.getHost().port, client, cCTX) - - return clientProto.deferred.addCallback( - self.assertEqual, "+OK <some crap>\r\n") - - - -class ConnectionLostTestCase(unittest.TestCase, ContextGeneratingMixin): - """ - SSL connection closing tests. - """ - - def testImmediateDisconnect(self): - org = "twisted.test.test_ssl" - self.setupServerAndClient( - (org, org + ", client"), {}, - (org, org + ", server"), {}) - - # Set up a server, connect to it with a client, which should work since our verifiers - # allow anything, then disconnect. - serverProtocolFactory = protocol.ServerFactory() - serverProtocolFactory.protocol = protocol.Protocol - self.serverPort = serverPort = reactor.listenSSL(0, - serverProtocolFactory, self.serverCtxFactory) - - clientProtocolFactory = protocol.ClientFactory() - clientProtocolFactory.protocol = ImmediatelyDisconnectingProtocol - clientProtocolFactory.connectionDisconnected = defer.Deferred() - clientConnector = reactor.connectSSL('127.0.0.1', - serverPort.getHost().port, clientProtocolFactory, self.clientCtxFactory) - - return clientProtocolFactory.connectionDisconnected.addCallback( - lambda ignoredResult: self.serverPort.stopListening()) - - - def test_bothSidesLoseConnection(self): - """ - Both sides of SSL connection close connection; the connections should - close cleanly, and only after the underlying TCP connection has - disconnected. - """ - class CloseAfterHandshake(protocol.Protocol): - def __init__(self): - self.done = defer.Deferred() - - def connectionMade(self): - self.transport.write("a") - - def dataReceived(self, data): - # If we got data, handshake is over: - self.transport.loseConnection() - - def connectionLost(self2, reason): - self2.done.errback(reason) - del self2.done - - org = "twisted.test.test_ssl" - self.setupServerAndClient( - (org, org + ", client"), {}, - (org, org + ", server"), {}) - - serverProtocol = CloseAfterHandshake() - serverProtocolFactory = protocol.ServerFactory() - serverProtocolFactory.protocol = lambda: serverProtocol - serverPort = reactor.listenSSL(0, - serverProtocolFactory, self.serverCtxFactory) - self.addCleanup(serverPort.stopListening) - - clientProtocol = CloseAfterHandshake() - clientProtocolFactory = protocol.ClientFactory() - clientProtocolFactory.protocol = lambda: clientProtocol - clientConnector = reactor.connectSSL('127.0.0.1', - serverPort.getHost().port, clientProtocolFactory, self.clientCtxFactory) - - def checkResult(failure): - failure.trap(ConnectionDone) - return defer.gatherResults( - [clientProtocol.done.addErrback(checkResult), - serverProtocol.done.addErrback(checkResult)]) - - if newTLS is None: - test_bothSidesLoseConnection.skip = "Old SSL code doesn't always close cleanly." - - - def testFailedVerify(self): - org = "twisted.test.test_ssl" - self.setupServerAndClient( - (org, org + ", client"), {}, - (org, org + ", server"), {}) - - def verify(*a): - return False - self.clientCtxFactory.getContext().set_verify(SSL.VERIFY_PEER, verify) - - serverConnLost = defer.Deferred() - serverProtocol = protocol.Protocol() - serverProtocol.connectionLost = serverConnLost.callback - serverProtocolFactory = protocol.ServerFactory() - serverProtocolFactory.protocol = lambda: serverProtocol - self.serverPort = serverPort = reactor.listenSSL(0, - serverProtocolFactory, self.serverCtxFactory) - - clientConnLost = defer.Deferred() - clientProtocol = protocol.Protocol() - clientProtocol.connectionLost = clientConnLost.callback - clientProtocolFactory = protocol.ClientFactory() - clientProtocolFactory.protocol = lambda: clientProtocol - clientConnector = reactor.connectSSL('127.0.0.1', - serverPort.getHost().port, clientProtocolFactory, self.clientCtxFactory) - - dl = defer.DeferredList([serverConnLost, clientConnLost], consumeErrors=True) - return dl.addCallback(self._cbLostConns) - - - def _cbLostConns(self, results): - (sSuccess, sResult), (cSuccess, cResult) = results - - self.failIf(sSuccess) - self.failIf(cSuccess) - - acceptableErrors = [SSL.Error] - - # Rather than getting a verification failure on Windows, we are getting - # a connection failure. Without something like sslverify proxying - # in-between we can't fix up the platform's errors, so let's just - # specifically say it is only OK in this one case to keep the tests - # passing. Normally we'd like to be as strict as possible here, so - # we're not going to allow this to report errors incorrectly on any - # other platforms. - - if platform.isWindows(): - from twisted.internet.error import ConnectionLost - acceptableErrors.append(ConnectionLost) - - sResult.trap(*acceptableErrors) - cResult.trap(*acceptableErrors) - - return self.serverPort.stopListening() - - - -class FakeContext: - """ - L{OpenSSL.SSL.Context} double which can more easily be inspected. - """ - def __init__(self, method): - self._method = method - self._options = 0 - - - def set_options(self, options): - self._options |= options - - - def use_certificate_file(self, fileName): - pass - - - def use_privatekey_file(self, fileName): - pass - - - -class DefaultOpenSSLContextFactoryTests(unittest.TestCase): - """ - Tests for L{ssl.DefaultOpenSSLContextFactory}. - """ - def setUp(self): - # pyOpenSSL Context objects aren't introspectable enough. Pass in - # an alternate context factory so we can inspect what is done to it. - self.contextFactory = ssl.DefaultOpenSSLContextFactory( - certPath, certPath, _contextFactory=FakeContext) - self.context = self.contextFactory.getContext() - - - def test_method(self): - """ - L{ssl.DefaultOpenSSLContextFactory.getContext} returns an SSL context - which can use SSLv3 or TLSv1 but not SSLv2. - """ - # SSLv23_METHOD allows SSLv2, SSLv3, or TLSv1 - self.assertEqual(self.context._method, SSL.SSLv23_METHOD) - - # And OP_NO_SSLv2 disables the SSLv2 support. - self.assertTrue(self.context._options & SSL.OP_NO_SSLv2) - - # Make sure SSLv3 and TLSv1 aren't disabled though. - self.assertFalse(self.context._options & SSL.OP_NO_SSLv3) - self.assertFalse(self.context._options & SSL.OP_NO_TLSv1) - - - def test_missingCertificateFile(self): - """ - Instantiating L{ssl.DefaultOpenSSLContextFactory} with a certificate - filename which does not identify an existing file results in the - initializer raising L{OpenSSL.SSL.Error}. - """ - self.assertRaises( - SSL.Error, - ssl.DefaultOpenSSLContextFactory, certPath, self.mktemp()) - - - def test_missingPrivateKeyFile(self): - """ - Instantiating L{ssl.DefaultOpenSSLContextFactory} with a private key - filename which does not identify an existing file results in the - initializer raising L{OpenSSL.SSL.Error}. - """ - self.assertRaises( - SSL.Error, - ssl.DefaultOpenSSLContextFactory, self.mktemp(), certPath) - - - -class ClientContextFactoryTests(unittest.TestCase): - """ - Tests for L{ssl.ClientContextFactory}. - """ - def setUp(self): - self.contextFactory = ssl.ClientContextFactory() - self.contextFactory._contextFactory = FakeContext - self.context = self.contextFactory.getContext() - - - def test_method(self): - """ - L{ssl.ClientContextFactory.getContext} returns a context which can use - SSLv3 or TLSv1 but not SSLv2. - """ - self.assertEqual(self.context._method, SSL.SSLv23_METHOD) - self.assertTrue(self.context._options & SSL.OP_NO_SSLv2) - self.assertFalse(self.context._options & SSL.OP_NO_SSLv3) - self.assertFalse(self.context._options & SSL.OP_NO_TLSv1) - - - -if interfaces.IReactorSSL(reactor, None) is None: - for tCase in [StolenTCPTestCase, TLSTestCase, SpammyTLSTestCase, - BufferingTestCase, ConnectionLostTestCase, - DefaultOpenSSLContextFactoryTests, - ClientContextFactoryTests]: - tCase.skip = "Reactor does not support SSL, cannot run SSL tests" - |