diff options
Diffstat (limited to 'lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/test/test_loopback.py')
-rwxr-xr-x | lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/test/test_loopback.py | 419 |
1 files changed, 0 insertions, 419 deletions
diff --git a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/test/test_loopback.py b/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/test/test_loopback.py deleted file mode 100755 index f09908fb..00000000 --- a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/test/test_loopback.py +++ /dev/null @@ -1,419 +0,0 @@ -# Copyright (c) Twisted Matrix Laboratories. -# See LICENSE for details. - -""" -Test case for L{twisted.protocols.loopback}. -""" - -from zope.interface import implements - -from twisted.trial import unittest -from twisted.trial.util import suppress as SUPPRESS -from twisted.protocols import basic, loopback -from twisted.internet import defer -from twisted.internet.protocol import Protocol -from twisted.internet.defer import Deferred -from twisted.internet.interfaces import IAddress, IPushProducer, IPullProducer -from twisted.internet import reactor, interfaces - - -class SimpleProtocol(basic.LineReceiver): - def __init__(self): - self.conn = defer.Deferred() - self.lines = [] - self.connLost = [] - - def connectionMade(self): - self.conn.callback(None) - - def lineReceived(self, line): - self.lines.append(line) - - def connectionLost(self, reason): - self.connLost.append(reason) - - -class DoomProtocol(SimpleProtocol): - i = 0 - def lineReceived(self, line): - self.i += 1 - if self.i < 4: - # by this point we should have connection closed, - # but just in case we didn't we won't ever send 'Hello 4' - self.sendLine("Hello %d" % self.i) - SimpleProtocol.lineReceived(self, line) - if self.lines[-1] == "Hello 3": - self.transport.loseConnection() - - -class LoopbackTestCaseMixin: - def testRegularFunction(self): - s = SimpleProtocol() - c = SimpleProtocol() - - def sendALine(result): - s.sendLine("THIS IS LINE ONE!") - s.transport.loseConnection() - s.conn.addCallback(sendALine) - - def check(ignored): - self.assertEqual(c.lines, ["THIS IS LINE ONE!"]) - self.assertEqual(len(s.connLost), 1) - self.assertEqual(len(c.connLost), 1) - d = defer.maybeDeferred(self.loopbackFunc, s, c) - d.addCallback(check) - return d - - def testSneakyHiddenDoom(self): - s = DoomProtocol() - c = DoomProtocol() - - def sendALine(result): - s.sendLine("DOOM LINE") - s.conn.addCallback(sendALine) - - def check(ignored): - self.assertEqual(s.lines, ['Hello 1', 'Hello 2', 'Hello 3']) - self.assertEqual(c.lines, ['DOOM LINE', 'Hello 1', 'Hello 2', 'Hello 3']) - self.assertEqual(len(s.connLost), 1) - self.assertEqual(len(c.connLost), 1) - d = defer.maybeDeferred(self.loopbackFunc, s, c) - d.addCallback(check) - return d - - - -class LoopbackAsyncTestCase(LoopbackTestCaseMixin, unittest.TestCase): - loopbackFunc = staticmethod(loopback.loopbackAsync) - - - def test_makeConnection(self): - """ - Test that the client and server protocol both have makeConnection - invoked on them by loopbackAsync. - """ - class TestProtocol(Protocol): - transport = None - def makeConnection(self, transport): - self.transport = transport - - server = TestProtocol() - client = TestProtocol() - loopback.loopbackAsync(server, client) - self.failIfEqual(client.transport, None) - self.failIfEqual(server.transport, None) - - - def _hostpeertest(self, get, testServer): - """ - Test one of the permutations of client/server host/peer. - """ - class TestProtocol(Protocol): - def makeConnection(self, transport): - Protocol.makeConnection(self, transport) - self.onConnection.callback(transport) - - if testServer: - server = TestProtocol() - d = server.onConnection = Deferred() - client = Protocol() - else: - server = Protocol() - client = TestProtocol() - d = client.onConnection = Deferred() - - loopback.loopbackAsync(server, client) - - def connected(transport): - host = getattr(transport, get)() - self.failUnless(IAddress.providedBy(host)) - - return d.addCallback(connected) - - - def test_serverHost(self): - """ - Test that the server gets a transport with a properly functioning - implementation of L{ITransport.getHost}. - """ - return self._hostpeertest("getHost", True) - - - def test_serverPeer(self): - """ - Like C{test_serverHost} but for L{ITransport.getPeer} - """ - return self._hostpeertest("getPeer", True) - - - def test_clientHost(self, get="getHost"): - """ - Test that the client gets a transport with a properly functioning - implementation of L{ITransport.getHost}. - """ - return self._hostpeertest("getHost", False) - - - def test_clientPeer(self): - """ - Like C{test_clientHost} but for L{ITransport.getPeer}. - """ - return self._hostpeertest("getPeer", False) - - - def _greetingtest(self, write, testServer): - """ - Test one of the permutations of write/writeSequence client/server. - """ - class GreeteeProtocol(Protocol): - bytes = "" - def dataReceived(self, bytes): - self.bytes += bytes - if self.bytes == "bytes": - self.received.callback(None) - - class GreeterProtocol(Protocol): - def connectionMade(self): - getattr(self.transport, write)("bytes") - - if testServer: - server = GreeterProtocol() - client = GreeteeProtocol() - d = client.received = Deferred() - else: - server = GreeteeProtocol() - d = server.received = Deferred() - client = GreeterProtocol() - - loopback.loopbackAsync(server, client) - return d - - - def test_clientGreeting(self): - """ - Test that on a connection where the client speaks first, the server - receives the bytes sent by the client. - """ - return self._greetingtest("write", False) - - - def test_clientGreetingSequence(self): - """ - Like C{test_clientGreeting}, but use C{writeSequence} instead of - C{write} to issue the greeting. - """ - return self._greetingtest("writeSequence", False) - - - def test_serverGreeting(self, write="write"): - """ - Test that on a connection where the server speaks first, the client - receives the bytes sent by the server. - """ - return self._greetingtest("write", True) - - - def test_serverGreetingSequence(self): - """ - Like C{test_serverGreeting}, but use C{writeSequence} instead of - C{write} to issue the greeting. - """ - return self._greetingtest("writeSequence", True) - - - def _producertest(self, producerClass): - toProduce = map(str, range(0, 10)) - - class ProducingProtocol(Protocol): - def connectionMade(self): - self.producer = producerClass(list(toProduce)) - self.producer.start(self.transport) - - class ReceivingProtocol(Protocol): - bytes = "" - def dataReceived(self, bytes): - self.bytes += bytes - if self.bytes == ''.join(toProduce): - self.received.callback((client, server)) - - server = ProducingProtocol() - client = ReceivingProtocol() - client.received = Deferred() - - loopback.loopbackAsync(server, client) - return client.received - - - def test_pushProducer(self): - """ - Test a push producer registered against a loopback transport. - """ - class PushProducer(object): - implements(IPushProducer) - resumed = False - - def __init__(self, toProduce): - self.toProduce = toProduce - - def resumeProducing(self): - self.resumed = True - - def start(self, consumer): - self.consumer = consumer - consumer.registerProducer(self, True) - self._produceAndSchedule() - - def _produceAndSchedule(self): - if self.toProduce: - self.consumer.write(self.toProduce.pop(0)) - reactor.callLater(0, self._produceAndSchedule) - else: - self.consumer.unregisterProducer() - d = self._producertest(PushProducer) - - def finished((client, server)): - self.failIf( - server.producer.resumed, - "Streaming producer should not have been resumed.") - d.addCallback(finished) - return d - - - def test_pullProducer(self): - """ - Test a pull producer registered against a loopback transport. - """ - class PullProducer(object): - implements(IPullProducer) - - def __init__(self, toProduce): - self.toProduce = toProduce - - def start(self, consumer): - self.consumer = consumer - self.consumer.registerProducer(self, False) - - def resumeProducing(self): - self.consumer.write(self.toProduce.pop(0)) - if not self.toProduce: - self.consumer.unregisterProducer() - return self._producertest(PullProducer) - - - def test_writeNotReentrant(self): - """ - L{loopback.loopbackAsync} does not call a protocol's C{dataReceived} - method while that protocol's transport's C{write} method is higher up - on the stack. - """ - class Server(Protocol): - def dataReceived(self, bytes): - self.transport.write("bytes") - - class Client(Protocol): - ready = False - - def connectionMade(self): - reactor.callLater(0, self.go) - - def go(self): - self.transport.write("foo") - self.ready = True - - def dataReceived(self, bytes): - self.wasReady = self.ready - self.transport.loseConnection() - - - server = Server() - client = Client() - d = loopback.loopbackAsync(client, server) - def cbFinished(ignored): - self.assertTrue(client.wasReady) - d.addCallback(cbFinished) - return d - - - def test_pumpPolicy(self): - """ - The callable passed as the value for the C{pumpPolicy} parameter to - L{loopbackAsync} is called with a L{_LoopbackQueue} of pending bytes - and a protocol to which they should be delivered. - """ - pumpCalls = [] - def dummyPolicy(queue, target): - bytes = [] - while queue: - bytes.append(queue.get()) - pumpCalls.append((target, bytes)) - - client = Protocol() - server = Protocol() - - finished = loopback.loopbackAsync(server, client, dummyPolicy) - self.assertEqual(pumpCalls, []) - - client.transport.write("foo") - client.transport.write("bar") - server.transport.write("baz") - server.transport.write("quux") - server.transport.loseConnection() - - def cbComplete(ignored): - self.assertEqual( - pumpCalls, - # The order here is somewhat arbitrary. The implementation - # happens to always deliver data to the client first. - [(client, ["baz", "quux", None]), - (server, ["foo", "bar"])]) - finished.addCallback(cbComplete) - return finished - - - def test_identityPumpPolicy(self): - """ - L{identityPumpPolicy} is a pump policy which calls the target's - C{dataReceived} method one for each string in the queue passed to it. - """ - bytes = [] - client = Protocol() - client.dataReceived = bytes.append - queue = loopback._LoopbackQueue() - queue.put("foo") - queue.put("bar") - queue.put(None) - - loopback.identityPumpPolicy(queue, client) - - self.assertEqual(bytes, ["foo", "bar"]) - - - def test_collapsingPumpPolicy(self): - """ - L{collapsingPumpPolicy} is a pump policy which calls the target's - C{dataReceived} only once with all of the strings in the queue passed - to it joined together. - """ - bytes = [] - client = Protocol() - client.dataReceived = bytes.append - queue = loopback._LoopbackQueue() - queue.put("foo") - queue.put("bar") - queue.put(None) - - loopback.collapsingPumpPolicy(queue, client) - - self.assertEqual(bytes, ["foobar"]) - - - -class LoopbackTCPTestCase(LoopbackTestCaseMixin, unittest.TestCase): - loopbackFunc = staticmethod(loopback.loopbackTCP) - - -class LoopbackUNIXTestCase(LoopbackTestCaseMixin, unittest.TestCase): - loopbackFunc = staticmethod(loopback.loopbackUNIX) - - if interfaces.IReactorUNIX(reactor, None) is None: - skip = "Current reactor does not support UNIX sockets" |