diff options
Diffstat (limited to 'lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/test/iosim.py')
-rwxr-xr-x | lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/test/iosim.py | 270 |
1 files changed, 0 insertions, 270 deletions
diff --git a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/test/iosim.py b/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/test/iosim.py deleted file mode 100755 index afa80f9f..00000000 --- a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/test/iosim.py +++ /dev/null @@ -1,270 +0,0 @@ -# -*- test-case-name: twisted.test.test_amp.TLSTest -*- -# Copyright (c) Twisted Matrix Laboratories. -# See LICENSE for details. - -""" -Utilities and helpers for simulating a network -""" - -import itertools - -try: - from OpenSSL.SSL import Error as NativeOpenSSLError -except ImportError: - pass - -from zope.interface import implements, directlyProvides - -from twisted.python.failure import Failure -from twisted.internet import error -from twisted.internet import interfaces - -class TLSNegotiation: - def __init__(self, obj, connectState): - self.obj = obj - self.connectState = connectState - self.sent = False - self.readyToSend = connectState - - def __repr__(self): - return 'TLSNegotiation(%r)' % (self.obj,) - - def pretendToVerify(self, other, tpt): - # Set the transport problems list here? disconnections? - # hmmmmm... need some negative path tests. - - if not self.obj.iosimVerify(other.obj): - tpt.disconnectReason = NativeOpenSSLError() - tpt.loseConnection() - - -class FakeTransport: - """ - A wrapper around a file-like object to make it behave as a Transport. - - This doesn't actually stream the file to the attached protocol, - and is thus useful mainly as a utility for debugging protocols. - """ - implements(interfaces.ITransport, - interfaces.ITLSTransport) # ha ha not really - - _nextserial = itertools.count().next - closed = 0 - disconnecting = 0 - disconnected = 0 - disconnectReason = error.ConnectionDone("Connection done") - producer = None - streamingProducer = 0 - tls = None - - def __init__(self): - self.stream = [] - self.serial = self._nextserial() - - def __repr__(self): - return 'FakeTransport<%s,%s,%s>' % ( - self.isServer and 'S' or 'C', self.serial, - self.protocol.__class__.__name__) - - def write(self, data): - if self.tls is not None: - self.tlsbuf.append(data) - else: - self.stream.append(data) - - def _checkProducer(self): - # Cheating; this is called at "idle" times to allow producers to be - # found and dealt with - if self.producer: - self.producer.resumeProducing() - - def registerProducer(self, producer, streaming): - """From abstract.FileDescriptor - """ - self.producer = producer - self.streamingProducer = streaming - if not streaming: - producer.resumeProducing() - - def unregisterProducer(self): - self.producer = None - - def stopConsuming(self): - self.unregisterProducer() - self.loseConnection() - - def writeSequence(self, iovec): - self.write("".join(iovec)) - - def loseConnection(self): - self.disconnecting = True - - def reportDisconnect(self): - if self.tls is not None: - # We were in the middle of negotiating! Must have been a TLS problem. - err = NativeOpenSSLError() - else: - err = self.disconnectReason - self.protocol.connectionLost(Failure(err)) - - def getPeer(self): - # XXX: According to ITransport, this should return an IAddress! - return 'file', 'file' - - def getHost(self): - # XXX: According to ITransport, this should return an IAddress! - return 'file' - - def resumeProducing(self): - # Never sends data anyways - pass - - def pauseProducing(self): - # Never sends data anyways - pass - - def stopProducing(self): - self.loseConnection() - - def startTLS(self, contextFactory, beNormal=True): - # Nothing's using this feature yet, but startTLS has an undocumented - # second argument which defaults to true; if set to False, servers will - # behave like clients and clients will behave like servers. - connectState = self.isServer ^ beNormal - self.tls = TLSNegotiation(contextFactory, connectState) - self.tlsbuf = [] - - def getOutBuffer(self): - S = self.stream - if S: - self.stream = [] - return ''.join(S) - elif self.tls is not None: - if self.tls.readyToSend: - # Only _send_ the TLS negotiation "packet" if I'm ready to. - self.tls.sent = True - return self.tls - else: - return None - else: - return None - - def bufferReceived(self, buf): - if isinstance(buf, TLSNegotiation): - assert self.tls is not None # By the time you're receiving a - # negotiation, you have to have called - # startTLS already. - if self.tls.sent: - self.tls.pretendToVerify(buf, self) - self.tls = None # we're done with the handshake if we've gotten - # this far... although maybe it failed...? - # TLS started! Unbuffer... - b, self.tlsbuf = self.tlsbuf, None - self.writeSequence(b) - directlyProvides(self, interfaces.ISSLTransport) - else: - # We haven't sent our own TLS negotiation: time to do that! - self.tls.readyToSend = True - else: - self.protocol.dataReceived(buf) - - - -def makeFakeClient(c): - ft = FakeTransport() - ft.isServer = False - ft.protocol = c - return ft - -def makeFakeServer(s): - ft = FakeTransport() - ft.isServer = True - ft.protocol = s - return ft - -class IOPump: - """Utility to pump data between clients and servers for protocol testing. - - Perhaps this is a utility worthy of being in protocol.py? - """ - def __init__(self, client, server, clientIO, serverIO, debug): - self.client = client - self.server = server - self.clientIO = clientIO - self.serverIO = serverIO - self.debug = debug - - def flush(self, debug=False): - """Pump until there is no more input or output. - - Returns whether any data was moved. - """ - result = False - for x in range(1000): - if self.pump(debug): - result = True - else: - break - else: - assert 0, "Too long" - return result - - - def pump(self, debug=False): - """Move data back and forth. - - Returns whether any data was moved. - """ - if self.debug or debug: - print '-- GLUG --' - sData = self.serverIO.getOutBuffer() - cData = self.clientIO.getOutBuffer() - self.clientIO._checkProducer() - self.serverIO._checkProducer() - if self.debug or debug: - print '.' - # XXX slightly buggy in the face of incremental output - if cData: - print 'C: '+repr(cData) - if sData: - print 'S: '+repr(sData) - if cData: - self.serverIO.bufferReceived(cData) - if sData: - self.clientIO.bufferReceived(sData) - if cData or sData: - return True - if (self.serverIO.disconnecting and - not self.serverIO.disconnected): - if self.debug or debug: - print '* C' - self.serverIO.disconnected = True - self.clientIO.disconnecting = True - self.clientIO.reportDisconnect() - return True - if self.clientIO.disconnecting and not self.clientIO.disconnected: - if self.debug or debug: - print '* S' - self.clientIO.disconnected = True - self.serverIO.disconnecting = True - self.serverIO.reportDisconnect() - return True - return False - - -def connectedServerAndClient(ServerClass, ClientClass, - clientTransportFactory=makeFakeClient, - serverTransportFactory=makeFakeServer, - debug=False): - """Returns a 3-tuple: (client, server, pump) - """ - c = ClientClass() - s = ServerClass() - cio = clientTransportFactory(c) - sio = serverTransportFactory(s) - c.makeConnection(cio) - s.makeConnection(sio) - pump = IOPump(c, s, cio, sio, debug) - # kick off server greeting, etc - pump.flush() - return c, s, pump |