aboutsummaryrefslogtreecommitdiffstats
path: root/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/test/test_stringtransport.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/test/test_stringtransport.py')
-rwxr-xr-xlib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/test/test_stringtransport.py279
1 files changed, 0 insertions, 279 deletions
diff --git a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/test/test_stringtransport.py b/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/test/test_stringtransport.py
deleted file mode 100755
index ca120982..00000000
--- a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/test/test_stringtransport.py
+++ /dev/null
@@ -1,279 +0,0 @@
-# Copyright (c) Twisted Matrix Laboratories.
-# See LICENSE for details.
-
-"""
-Tests for L{twisted.test.proto_helpers}.
-"""
-
-from zope.interface.verify import verifyObject
-
-from twisted.internet.interfaces import (ITransport, IPushProducer, IConsumer,
- IReactorTCP, IReactorSSL, IReactorUNIX, IAddress, IListeningPort,
- IConnector)
-from twisted.internet.address import IPv4Address
-from twisted.trial.unittest import TestCase
-from twisted.test.proto_helpers import (StringTransport, MemoryReactor,
- RaisingMemoryReactor)
-from twisted.internet.protocol import ClientFactory, Factory
-
-
-class StringTransportTests(TestCase):
- """
- Tests for L{twisted.test.proto_helpers.StringTransport}.
- """
- def setUp(self):
- self.transport = StringTransport()
-
-
- def test_interfaces(self):
- """
- L{StringTransport} instances provide L{ITransport}, L{IPushProducer},
- and L{IConsumer}.
- """
- self.assertTrue(verifyObject(ITransport, self.transport))
- self.assertTrue(verifyObject(IPushProducer, self.transport))
- self.assertTrue(verifyObject(IConsumer, self.transport))
-
-
- def test_registerProducer(self):
- """
- L{StringTransport.registerProducer} records the arguments supplied to
- it as instance attributes.
- """
- producer = object()
- streaming = object()
- self.transport.registerProducer(producer, streaming)
- self.assertIdentical(self.transport.producer, producer)
- self.assertIdentical(self.transport.streaming, streaming)
-
-
- def test_disallowedRegisterProducer(self):
- """
- L{StringTransport.registerProducer} raises L{RuntimeError} if a
- producer is already registered.
- """
- producer = object()
- self.transport.registerProducer(producer, True)
- self.assertRaises(
- RuntimeError, self.transport.registerProducer, object(), False)
- self.assertIdentical(self.transport.producer, producer)
- self.assertTrue(self.transport.streaming)
-
-
- def test_unregisterProducer(self):
- """
- L{StringTransport.unregisterProducer} causes the transport to forget
- about the registered producer and makes it possible to register a new
- one.
- """
- oldProducer = object()
- newProducer = object()
- self.transport.registerProducer(oldProducer, False)
- self.transport.unregisterProducer()
- self.assertIdentical(self.transport.producer, None)
- self.transport.registerProducer(newProducer, True)
- self.assertIdentical(self.transport.producer, newProducer)
- self.assertTrue(self.transport.streaming)
-
-
- def test_invalidUnregisterProducer(self):
- """
- L{StringTransport.unregisterProducer} raises L{RuntimeError} if called
- when no producer is registered.
- """
- self.assertRaises(RuntimeError, self.transport.unregisterProducer)
-
-
- def test_initialProducerState(self):
- """
- L{StringTransport.producerState} is initially C{'producing'}.
- """
- self.assertEqual(self.transport.producerState, 'producing')
-
-
- def test_pauseProducing(self):
- """
- L{StringTransport.pauseProducing} changes the C{producerState} of the
- transport to C{'paused'}.
- """
- self.transport.pauseProducing()
- self.assertEqual(self.transport.producerState, 'paused')
-
-
- def test_resumeProducing(self):
- """
- L{StringTransport.resumeProducing} changes the C{producerState} of the
- transport to C{'producing'}.
- """
- self.transport.pauseProducing()
- self.transport.resumeProducing()
- self.assertEqual(self.transport.producerState, 'producing')
-
-
- def test_stopProducing(self):
- """
- L{StringTransport.stopProducing} changes the C{'producerState'} of the
- transport to C{'stopped'}.
- """
- self.transport.stopProducing()
- self.assertEqual(self.transport.producerState, 'stopped')
-
-
- def test_stoppedTransportCannotPause(self):
- """
- L{StringTransport.pauseProducing} raises L{RuntimeError} if the
- transport has been stopped.
- """
- self.transport.stopProducing()
- self.assertRaises(RuntimeError, self.transport.pauseProducing)
-
-
- def test_stoppedTransportCannotResume(self):
- """
- L{StringTransport.resumeProducing} raises L{RuntimeError} if the
- transport has been stopped.
- """
- self.transport.stopProducing()
- self.assertRaises(RuntimeError, self.transport.resumeProducing)
-
-
- def test_disconnectingTransportCannotPause(self):
- """
- L{StringTransport.pauseProducing} raises L{RuntimeError} if the
- transport is being disconnected.
- """
- self.transport.loseConnection()
- self.assertRaises(RuntimeError, self.transport.pauseProducing)
-
-
- def test_disconnectingTransportCannotResume(self):
- """
- L{StringTransport.resumeProducing} raises L{RuntimeError} if the
- transport is being disconnected.
- """
- self.transport.loseConnection()
- self.assertRaises(RuntimeError, self.transport.resumeProducing)
-
-
- def test_loseConnectionSetsDisconnecting(self):
- """
- L{StringTransport.loseConnection} toggles the C{disconnecting} instance
- variable to C{True}.
- """
- self.assertFalse(self.transport.disconnecting)
- self.transport.loseConnection()
- self.assertTrue(self.transport.disconnecting)
-
-
- def test_specifiedHostAddress(self):
- """
- If a host address is passed to L{StringTransport.__init__}, that
- value is returned from L{StringTransport.getHost}.
- """
- address = object()
- self.assertIdentical(StringTransport(address).getHost(), address)
-
-
- def test_specifiedPeerAddress(self):
- """
- If a peer address is passed to L{StringTransport.__init__}, that
- value is returned from L{StringTransport.getPeer}.
- """
- address = object()
- self.assertIdentical(
- StringTransport(peerAddress=address).getPeer(), address)
-
-
- def test_defaultHostAddress(self):
- """
- If no host address is passed to L{StringTransport.__init__}, an
- L{IPv4Address} is returned from L{StringTransport.getHost}.
- """
- address = StringTransport().getHost()
- self.assertIsInstance(address, IPv4Address)
-
-
- def test_defaultPeerAddress(self):
- """
- If no peer address is passed to L{StringTransport.__init__}, an
- L{IPv4Address} is returned from L{StringTransport.getPeer}.
- """
- address = StringTransport().getPeer()
- self.assertIsInstance(address, IPv4Address)
-
-
-
-class ReactorTests(TestCase):
- """
- Tests for L{MemoryReactor} and L{RaisingMemoryReactor}.
- """
-
- def test_memoryReactorProvides(self):
- """
- L{MemoryReactor} provides all of the attributes described by the
- interfaces it advertises.
- """
- memoryReactor = MemoryReactor()
- verifyObject(IReactorTCP, memoryReactor)
- verifyObject(IReactorSSL, memoryReactor)
- verifyObject(IReactorUNIX, memoryReactor)
-
-
- def test_raisingReactorProvides(self):
- """
- L{RaisingMemoryReactor} provides all of the attributes described by the
- interfaces it advertises.
- """
- raisingReactor = RaisingMemoryReactor()
- verifyObject(IReactorTCP, raisingReactor)
- verifyObject(IReactorSSL, raisingReactor)
- verifyObject(IReactorUNIX, raisingReactor)
-
-
- def test_connectDestination(self):
- """
- L{MemoryReactor.connectTCP}, L{MemoryReactor.connectSSL}, and
- L{MemoryReactor.connectUNIX} will return an L{IConnector} whose
- C{getDestination} method returns an L{IAddress} with attributes which
- reflect the values passed.
- """
- memoryReactor = MemoryReactor()
- for connector in [memoryReactor.connectTCP(
- "test.example.com", 8321, ClientFactory()),
- memoryReactor.connectSSL(
- "test.example.com", 8321, ClientFactory(),
- None)]:
- verifyObject(IConnector, connector)
- address = connector.getDestination()
- verifyObject(IAddress, address)
- self.assertEqual(address.host, "test.example.com")
- self.assertEqual(address.port, 8321)
- connector = memoryReactor.connectUNIX("/fake/path", ClientFactory())
- verifyObject(IConnector, connector)
- address = connector.getDestination()
- verifyObject(IAddress, address)
- self.assertEqual(address.name, "/fake/path")
-
-
- def test_listenDefaultHost(self):
- """
- L{MemoryReactor.listenTCP}, L{MemoryReactor.listenSSL} and
- L{MemoryReactor.listenUNIX} will return an L{IListeningPort} whose
- C{getHost} method returns an L{IAddress}; C{listenTCP} and C{listenSSL}
- will have a default host of C{'0.0.0.0'}, and a port that reflects the
- value passed, and C{listenUNIX} will have a name that reflects the path
- passed.
- """
- memoryReactor = MemoryReactor()
- for port in [memoryReactor.listenTCP(8242, Factory()),
- memoryReactor.listenSSL(8242, Factory(), None)]:
- verifyObject(IListeningPort, port)
- address = port.getHost()
- verifyObject(IAddress, address)
- self.assertEqual(address.host, '0.0.0.0')
- self.assertEqual(address.port, 8242)
- port = memoryReactor.listenUNIX("/path/to/socket", Factory())
- verifyObject(IListeningPort, port)
- address = port.getHost()
- verifyObject(IAddress, address)
- self.assertEqual(address.name, "/path/to/socket")